a5d2865eb2
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
360 lines
11 KiB
Python
360 lines
11 KiB
Python
"""Denon AVR-S540BT Bluetooth RFCOMM protocol client."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import socket
|
|
import threading
|
|
import time
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass, field
|
|
from typing import TYPE_CHECKING
|
|
|
|
from .const import (
|
|
CMD_GET_SOURCES,
|
|
CMD_GET_STATUS,
|
|
CONNECT_TIMEOUT,
|
|
DEFAULT_SOURCES,
|
|
RFCOMM_CHANNEL,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from homeassistant.core import HomeAssistant
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class DenonState:
|
|
"""Current state of the Denon AVR."""
|
|
|
|
power: bool = False
|
|
volume_raw: int | None = None
|
|
muted: bool = False
|
|
source: str | None = None
|
|
source_code: int | None = None
|
|
sound_mode: str | None = None
|
|
sources: dict[int, str] = field(default_factory=lambda: dict(DEFAULT_SOURCES))
|
|
|
|
|
|
def cmd_select_input(source_byte: int) -> bytes:
|
|
"""Build input selection command."""
|
|
return bytes([0x41, 0x54, 0x00, 0x07, 0x01, source_byte, 0x00])
|
|
|
|
|
|
class DenonBTClient:
|
|
"""Async-compatible Denon Bluetooth RFCOMM client for Home Assistant."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
mac: str,
|
|
state_callback: Callable[[DenonState], None],
|
|
) -> None:
|
|
self._hass = hass
|
|
self._mac = mac
|
|
self._state_callback = state_callback
|
|
|
|
self._sock: socket.socket | None = None
|
|
self._connected = False
|
|
self._stop_event = threading.Event()
|
|
self._recv_thread: threading.Thread | None = None
|
|
self._state = DenonState()
|
|
self._lock = threading.Lock()
|
|
|
|
@property
|
|
def connected(self) -> bool:
|
|
return self._connected
|
|
|
|
@property
|
|
def state(self) -> DenonState:
|
|
return self._state
|
|
|
|
# ── Connection ───────────────────────────────────────────────────────────
|
|
|
|
async def async_connect(self) -> bool:
|
|
"""Connect to the AVR (blocking work runs in executor)."""
|
|
try:
|
|
await self._hass.async_add_executor_job(self._blocking_connect)
|
|
return True
|
|
except Exception as err:
|
|
_LOGGER.error("Connection to %s failed: %s", self._mac, err)
|
|
return False
|
|
|
|
def _blocking_connect(self) -> None:
|
|
self._stop_event.clear()
|
|
sock = socket.socket(
|
|
socket.AF_BLUETOOTH,
|
|
socket.SOCK_STREAM,
|
|
socket.BTPROTO_RFCOMM,
|
|
)
|
|
sock.settimeout(CONNECT_TIMEOUT)
|
|
sock.connect((self._mac, RFCOMM_CHANNEL))
|
|
sock.settimeout(None)
|
|
|
|
self._sock = sock
|
|
self._connected = True
|
|
|
|
self._recv_thread = threading.Thread(
|
|
target=self._recv_loop,
|
|
daemon=True,
|
|
name=f"denon_bt_recv_{self._mac}",
|
|
)
|
|
self._recv_thread.start()
|
|
|
|
# Give the AVR a moment, then request initial state.
|
|
time.sleep(0.3)
|
|
self._blocking_send(CMD_GET_SOURCES)
|
|
self._blocking_send(CMD_GET_STATUS)
|
|
|
|
async def async_disconnect(self) -> None:
|
|
await self._hass.async_add_executor_job(self._blocking_disconnect)
|
|
|
|
def _blocking_disconnect(self) -> None:
|
|
self._stop_event.set()
|
|
self._connected = False
|
|
if self._sock:
|
|
try:
|
|
self._sock.close()
|
|
except Exception:
|
|
pass
|
|
self._sock = None
|
|
|
|
# ── Send ─────────────────────────────────────────────────────────────────
|
|
|
|
async def async_send(self, data: bytes) -> bool:
|
|
if not self._connected:
|
|
return False
|
|
try:
|
|
await self._hass.async_add_executor_job(self._blocking_send, data)
|
|
return True
|
|
except Exception as err:
|
|
_LOGGER.error("Send to %s failed: %s", self._mac, err)
|
|
await self.async_disconnect()
|
|
return False
|
|
|
|
def _blocking_send(self, data: bytes) -> None:
|
|
if self._sock:
|
|
self._sock.sendall(data)
|
|
|
|
# ── Receive loop ─────────────────────────────────────────────────────────
|
|
|
|
def _recv_loop(self) -> None:
|
|
buf = b""
|
|
while not self._stop_event.is_set() and self._sock:
|
|
try:
|
|
chunk = self._sock.recv(256)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
|
|
while len(buf) >= 4:
|
|
idx = buf.find(b"AT")
|
|
if idx == -1:
|
|
buf = b""
|
|
break
|
|
if idx > 0:
|
|
buf = buf[idx:]
|
|
if len(buf) < 4:
|
|
break
|
|
|
|
pkt_len = _packet_length(buf)
|
|
if pkt_len is None or len(buf) < pkt_len:
|
|
break
|
|
|
|
pkt = buf[:pkt_len]
|
|
buf = buf[pkt_len:]
|
|
self._handle_packet(pkt)
|
|
|
|
except Exception as err:
|
|
_LOGGER.debug("Recv loop ended: %s", err)
|
|
break
|
|
|
|
self._connected = False
|
|
self._notify_state_change()
|
|
|
|
# ── Packet handling ──────────────────────────────────────────────────────
|
|
|
|
def _handle_packet(self, data: bytes) -> None:
|
|
if len(data) < 4:
|
|
return
|
|
|
|
cat, typ = data[2], data[3]
|
|
changed = False
|
|
|
|
# Volume report: AT 07 02 03 C5 [vol] 00 [chk]
|
|
if cat == 0x07 and typ == 0x02 and len(data) >= 7:
|
|
if data[4] == 0x03 and data[5] == 0xC5:
|
|
with self._lock:
|
|
self._state.volume_raw = data[6]
|
|
self._state.power = True
|
|
changed = True
|
|
|
|
# Mute state: AT 07 1D 01 [state] [chk]
|
|
elif cat == 0x07 and typ == 0x1D and len(data) >= 6:
|
|
with self._lock:
|
|
self._state.muted = data[4] == 0x01 and data[5] == 0x01
|
|
changed = True
|
|
|
|
# Sound mode: AT 07 1E 15 00 [20 bytes text] [chk]
|
|
elif cat == 0x07 and typ == 0x1E and len(data) >= 6:
|
|
text = bytes(b for b in data[5:] if 32 <= b < 127).decode(
|
|
"ascii", errors="replace"
|
|
).strip()
|
|
if text:
|
|
with self._lock:
|
|
self._state.sound_mode = text
|
|
changed = True
|
|
|
|
# Input sources list: AT 00 04 0A 07 ...
|
|
elif cat == 0x00 and typ == 0x04 and len(data) >= 8:
|
|
discovered: dict[int, str] = {}
|
|
for b in data[4:]:
|
|
if b in DEFAULT_SOURCES:
|
|
discovered[b] = DEFAULT_SOURCES[b]
|
|
elif 0x40 <= b <= 0x7F:
|
|
discovered[b] = f"Input {b:02X}"
|
|
if discovered:
|
|
with self._lock:
|
|
self._state.sources = discovered
|
|
changed = True
|
|
|
|
# Current input: AT 00 07 02 [src] 00 [chk]
|
|
elif cat == 0x00 and typ == 0x07 and len(data) >= 6 and data[4] == 0x02:
|
|
code = data[5]
|
|
with self._lock:
|
|
self._state.source_code = code
|
|
self._state.source = self._state.sources.get(
|
|
code, f"Input {code:02X}"
|
|
)
|
|
changed = True
|
|
|
|
# Power echo: AT 00 0A 01 [state] [chk]
|
|
elif cat == 0x00 and typ == 0x0A and len(data) >= 6:
|
|
with self._lock:
|
|
self._state.power = data[4] == 0x01 and data[5] == 0x01
|
|
changed = True
|
|
|
|
# Power ack (off): AT 07 0B 01 00 FF
|
|
elif cat == 0x07 and typ == 0x0B and len(data) >= 6:
|
|
if data[4] == 0x01 and data[5] == 0x00:
|
|
with self._lock:
|
|
self._state.power = False
|
|
changed = True
|
|
|
|
if changed:
|
|
self._notify_state_change()
|
|
|
|
def _notify_state_change(self) -> None:
|
|
if self._hass and self._state_callback:
|
|
self._hass.loop.call_soon_threadsafe(
|
|
self._state_callback, self._state
|
|
)
|
|
|
|
|
|
# ── Packet length table (from denon_remote.py) ──────────────────────────────
|
|
|
|
|
|
def _packet_length(buf: bytes) -> int | None:
|
|
"""Determine expected packet length from the first bytes."""
|
|
if len(buf) < 4:
|
|
return None
|
|
cat = buf[2]
|
|
typ = buf[3]
|
|
|
|
if cat == 0x07:
|
|
if typ in (0x00, 0x01):
|
|
return 6
|
|
if typ == 0x02:
|
|
return 9
|
|
if typ == 0x03:
|
|
return 6
|
|
if typ == 0x1E:
|
|
return 27
|
|
if typ in (0x0B, 0x1D, 0x25):
|
|
return 7
|
|
if typ == 0x31:
|
|
return 14
|
|
|
|
if cat == 0x00:
|
|
if typ == 0x04:
|
|
if len(buf) > 4 and buf[4] == 0x00:
|
|
return 6
|
|
return 16
|
|
if typ == 0x06:
|
|
if len(buf) > 4 and buf[4] == 0x00:
|
|
return 6
|
|
return 75
|
|
if typ == 0x07:
|
|
if len(buf) > 4 and buf[4] == 0x01:
|
|
return 7
|
|
return 8
|
|
if typ == 0x09:
|
|
return 7
|
|
if typ == 0x0A:
|
|
return 7
|
|
if typ == 0x0B:
|
|
return 7
|
|
if typ == 0x0D:
|
|
return 21
|
|
if typ == 0x0E:
|
|
return 16
|
|
if typ == 0x13:
|
|
return 7
|
|
|
|
if cat == 0x0D:
|
|
if typ == 0x08:
|
|
return 13
|
|
if typ == 0x0D:
|
|
return 6
|
|
|
|
if cat == 0x81:
|
|
if typ == 0x04:
|
|
return 7
|
|
if typ == 0x32:
|
|
return 119
|
|
if typ == 0x40:
|
|
return 7
|
|
|
|
if cat == 0x82:
|
|
if typ == 0x31:
|
|
return 14
|
|
|
|
# Fallback: next AT header or 32 bytes max
|
|
next_at = buf.find(b"AT", 2)
|
|
if next_at > 0:
|
|
return next_at
|
|
return min(32, len(buf))
|
|
|
|
|
|
# ── D-Bus device discovery ───────────────────────────────────────────────────
|
|
|
|
|
|
def find_denon_devices() -> list[dict[str, str | bool]]:
|
|
"""Query BlueZ D-Bus for paired devices with 'DENON' in the name."""
|
|
try:
|
|
import dbus # noqa: PLC0415
|
|
|
|
bus = dbus.SystemBus()
|
|
manager = dbus.Interface(
|
|
bus.get_object("org.bluez", "/"),
|
|
"org.freedesktop.DBus.ObjectManager",
|
|
)
|
|
objects = manager.GetManagedObjects()
|
|
results: list[dict[str, str | bool]] = []
|
|
for _path, interfaces in objects.items():
|
|
if "org.bluez.Device1" in interfaces:
|
|
dev = interfaces["org.bluez.Device1"]
|
|
name = str(dev.get("Name", ""))
|
|
if "DENON" in name.upper():
|
|
results.append(
|
|
{
|
|
"mac": str(dev.get("Address")),
|
|
"name": name,
|
|
"paired": bool(dev.get("Paired", False)),
|
|
}
|
|
)
|
|
return results
|
|
except Exception as err:
|
|
_LOGGER.debug("D-Bus discovery failed: %s", err)
|
|
return []
|