From a5d2865eb2489cd92fd3dfb2313a26ee3f0f455b Mon Sep 17 00:00:00 2001 From: ckoch Date: Sat, 6 Jun 2026 10:59:47 -0400 Subject: [PATCH] Initial commit Co-Authored-By: Claude Opus 4.8 (1M context) --- .gitignore | 15 + PROJECT_CONTEXT.md | 142 +++ custom_components/denon_bt/__init__.py | 25 + custom_components/denon_bt/config_flow.py | 106 +++ custom_components/denon_bt/const.py | 37 + custom_components/denon_bt/manifest.json | 10 + custom_components/denon_bt/media_player.py | 221 +++++ custom_components/denon_bt/protocol.py | 359 ++++++++ .../denon_bt/translations/en.json | 27 + denon_remote.py | 858 ++++++++++++++++++ 10 files changed, 1800 insertions(+) create mode 100644 .gitignore create mode 100644 PROJECT_CONTEXT.md create mode 100644 custom_components/denon_bt/__init__.py create mode 100644 custom_components/denon_bt/config_flow.py create mode 100644 custom_components/denon_bt/const.py create mode 100644 custom_components/denon_bt/manifest.json create mode 100644 custom_components/denon_bt/media_player.py create mode 100644 custom_components/denon_bt/protocol.py create mode 100644 custom_components/denon_bt/translations/en.json create mode 100644 denon_remote.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..023776f --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ +venv/ +env/ +.env + +# OS / editor +.DS_Store +*.swp +*~ +.idea/ +.vscode/ diff --git a/PROJECT_CONTEXT.md b/PROJECT_CONTEXT.md new file mode 100644 index 0000000..791b09e --- /dev/null +++ b/PROJECT_CONTEXT.md @@ -0,0 +1,142 @@ +# Denon AVR-S540BT Bluetooth Remote — Project Context + +## What We're Building +A custom Python application to control the Denon AVR-S540BT AV receiver over +Bluetooth, replicating the functionality of the official "Denon 500 Series Remote" +Android app without relying on it. + +## How We Got Here + +### Discovery +The AVR-S540BT has no network (no Ethernet, no WiFi). The only remote control +interface is Bluetooth, used by Denon's own "Denon 500 Series Remote" app. +Denon does not publish the Bluetooth control protocol. + +### Reverse Engineering Method +1. Enabled Bluetooth HCI snoop log on a Pixel 10 Pro XL (Android Developer + Options → Enable Bluetooth HCI snoop log) +2. Used the official Denon app to perform actions: connect, volume up/down, + input selection, etc. +3. Extracted the log via `adb bugreport` (direct `adb pull` is blocked on + Android 15) and unzipped the btsnoop_hci.log from inside the zip +4. Parsed the binary BTSnoop v1 file in Python (no external dependencies) + and identified ACL data packets containing the string "AT" (0x41 0x54) +5. Decoded the packet structure + +### Protocol Details +- **Transport**: Bluetooth Classic, RFCOMM (Serial Port Profile) +- **Channel**: 2 (service name "BT SPP 2", confirmed via SDP in the log) +- **Framing**: Each packet starts with bytes `41 54` ("AT"), followed by + binary command bytes. The trailing byte in raw HCI is an RFCOMM FCS + (Frame Check Sequence) added by the kernel — NOT part of the application + payload. When using `socket.AF_BLUETOOTH / BTPROTO_RFCOMM`, the kernel + handles framing automatically; you just read/write raw application data. + +### Decoded Commands + +#### Phone → AVR (commands) +| Action | Bytes (hex) | Notes | +|------------------|------------------------------|-----------------------------| +| Volume Up | `41 54 07 00 00 00` | | +| Volume Down | `41 54 07 01 00 00` | | +| Mute On | `41 54 07 1D 01 01 FE` | Discrete on | +| Mute Off | `41 54 07 1D 01 00 FF` | Discrete off | +| Power Off | `41 54 00 0A 01 00 FF` | | +| Power On | `41 54 00 0A 01 01 FE` | | +| Get Input Sources| `41 54 00 04 00 00` | | +| Get Status | `41 54 00 06 00 00` | | +| Select Input | `41 54 00 07 01 [src] [chk]` | chk = 0xFF XOR src | +| Mute Toggle* | `41 54 07 25 01 20 DF` | Used in handshake, not user | + +#### AVR → Phone (responses) +| Response | Bytes (hex) | Notes | +|-------------------|------------------------------------------------------|------------------------------------------| +| Volume Report | `41 54 07 02 03 C5 [vol] 00 [chk]` | vol/2 = display value, chk = 0xFF-vol+1 | +| Mute State | `41 54 07 1D 01 [state] [chk]` | state: 01=muted, 00=unmuted | +| Sound Mode | `41 54 07 1E 15 00 [20 bytes text] [chk]` | ASCII, space-padded, 27 bytes total | +| Power Ack | `41 54 07 0B 01 00 FF` | Sent by AVR on power off | +| Input Sources | `41 54 00 04 0A 07 40 81 55 82 52 53 54 56 57 B1` | 0x81/0x82 are category separators | +| Current Input | `41 54 00 07 02 [src] 00 [chk]` | | +| Power Echo | `41 54 00 0A 01 [state] [chk]` | AVR echoes power state back | +| Status Keepalive | `41 54 00 0B 01 07 F8` | Sent by AVR after power on | +| Device Name | `41 54 00 0E 0A 00 [name bytes] [chk]` | Returned " DENON AV", 16 bytes total | +| Status Response | `41 54 00 06 45 ... [75 bytes]` | Full status dump | + +#### Known Input Source Byte Codes +These were observed in the log. Exact label↔code mapping is approximate and +should be confirmed by testing: +| Byte | Label | +|------|-----------| +| 0x40 | Bluetooth | +| 0x55 | Media Player | ('U' — label approximate, needs confirmation) +| 0x52 | CBL/SAT | +| 0x53 | DVD | +| 0x54 | Blu-ray | +| 0x56 | Game | +| 0x57 | AUX | + +### Second Capture (Session 2) +Actions performed: volume 20.5→21, mute on, mute off, volume down, mute on, +mute off, power off, power on, power off, power on. + +**New findings:** +- **Mute** is discrete on/off (07 1D), not a toggle. The 07 25 command seen + in the first capture is part of the connection handshake, not user mute. +- **Power** uses 00 0A with 01 00 = off, 01 01 = on. +- After power on, the AVR sends a status keepalive (00 0B 01 07) and the + Denon app replays the mute-toggle handshake (07 25) before resuming. +- **Volume display mapping**: raw byte / 2 = display value (e.g., 0x29=41 + → display "20.5", 0x2a=42 → display "21"). +- Source byte **0x55** ("Media Player"?) discovered in sources list. +- **0x81 / 0x82** in the sources response are category separators, not sources. + +### Commands Not Yet Captured +The following still need to be reverse engineered: +- Individual sound mode selection (only the status report was seen) +- Tuner controls (band, tune up/down, preset) +- Quick Select buttons + +## Current Codebase + +### denon_remote.py +A Python 3 tkinter application (stdlib + optional python3-dbus for device +auto-detection). + +**Key design decisions:** +- Uses `socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM` + (Linux built-in, no PyBluez needed) +- Receive loop runs in a background daemon thread +- All UI updates routed through a `queue.Queue` polled with `after(50,...)` + to keep tkinter thread-safe +- Packet length detection is best-effort heuristic based on known packet types; + unknown packets fall back to reading until the next `AT` header or 32 bytes +- Input source buttons are rendered dynamically from the AVR's own source list + response rather than being hard-coded +- Auto-detects paired Denon devices via BlueZ D-Bus (falls back gracefully + if python3-dbus is unavailable) +- Power on/off and discrete mute on/off commands implemented +- Mute state tracked and reflected in UI (button changes to red "MUTED") +- Volume displayed as the AVR's display value (raw_byte / 2) + +**Known limitations / things to improve:** +- Packet length detection covers all observed packet types but unknown types + may still be mis-framed +- Input source label mapping is approximate (especially 0x55 "Media Player") +- No reconnect logic if the connection drops +- Sound mode selection not implemented (only displays current mode) +- The UI aesthetic uses Courier as a monospace font — works everywhere but + could use a proper monospace like JetBrains Mono or similar if available + +## Environment +- **Dev machine**: Linux (Ubuntu/Debian), user ckoch on host Shinobu +- **Receiver**: Denon AVR-S540BT +- **Test phone**: Pixel 10 Pro XL (Android 15) +- **Tools used**: adb (34.0.4), Wireshark 4.2.2, Python 3.12 + +## Next Steps +1. Test power on/off and mute on/off against the real receiver +2. Confirm the 0x55 source label (likely "Media Player" or "USB") +3. Do a third btsnoop capture targeting: sound mode switching, tuner, quick + select +4. Add reconnect logic for dropped connections +5. Build out a more complete and polished application diff --git a/custom_components/denon_bt/__init__.py b/custom_components/denon_bt/__init__.py new file mode 100644 index 0000000..b4eb73d --- /dev/null +++ b/custom_components/denon_bt/__init__.py @@ -0,0 +1,25 @@ +"""Denon AVR Bluetooth integration for Home Assistant.""" +from __future__ import annotations + +import logging + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import Platform +from homeassistant.core import HomeAssistant + +from .const import DOMAIN # noqa: F401 + +_LOGGER = logging.getLogger(__name__) + +PLATFORMS = [Platform.MEDIA_PLAYER] + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up Denon BT from a config entry.""" + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload a config entry.""" + return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) diff --git a/custom_components/denon_bt/config_flow.py b/custom_components/denon_bt/config_flow.py new file mode 100644 index 0000000..ac6efe6 --- /dev/null +++ b/custom_components/denon_bt/config_flow.py @@ -0,0 +1,106 @@ +"""Config flow for Denon AVR Bluetooth integration.""" +from __future__ import annotations + +import logging +import re +from typing import Any + +import voluptuous as vol + +from homeassistant import config_entries +from homeassistant.data_entry_flow import FlowResult + +from .const import DOMAIN +from .protocol import find_denon_devices + +_LOGGER = logging.getLogger(__name__) + +_MAC_RE = re.compile(r"^([0-9A-F]{2}:){5}[0-9A-F]{2}$", re.IGNORECASE) + +MANUAL_SCHEMA = vol.Schema( + { + vol.Required("mac"): str, + vol.Optional("name", default="Denon AVR-S540BT"): str, + } +) + + +class DenonBTConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): + """Handle a config flow for Denon AVR Bluetooth.""" + + VERSION = 1 + + def __init__(self) -> None: + self._discovered: list[dict[str, Any]] = [] + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Entry point — try D-Bus discovery, fall back to manual.""" + self._discovered = await self.hass.async_add_executor_job( + find_denon_devices + ) + if self._discovered: + return await self.async_step_select() + return await self.async_step_manual() + + # ── Device selection ───────────────────────────────────────────────────── + + async def async_step_select( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Let the user pick a discovered device or go manual.""" + if user_input is not None: + chosen = user_input["device"] + if chosen == "_manual": + return await self.async_step_manual() + + for dev in self._discovered: + if dev["mac"] == chosen: + await self.async_set_unique_id(dev["mac"].upper()) + self._abort_if_unique_id_configured() + return self.async_create_entry( + title=dev["name"], + data={"mac": dev["mac"], "name": dev["name"]}, + ) + + options = { + dev["mac"]: f"{dev['name']} ({dev['mac']})" + for dev in self._discovered + } + options["_manual"] = "Enter MAC address manually\u2026" + + return self.async_show_form( + step_id="select", + data_schema=vol.Schema( + {vol.Required("device"): vol.In(options)} + ), + ) + + # ── Manual MAC entry ───────────────────────────────────────────────────── + + async def async_step_manual( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle manual MAC address entry.""" + errors: dict[str, str] = {} + + if user_input is not None: + mac = user_input["mac"].strip().upper() + name = user_input.get("name", "Denon AVR") + + if not _MAC_RE.match(mac): + errors["base"] = "invalid_mac" + else: + await self.async_set_unique_id(mac) + self._abort_if_unique_id_configured() + return self.async_create_entry( + title=name, + data={"mac": mac, "name": name}, + ) + + return self.async_show_form( + step_id="manual", + data_schema=MANUAL_SCHEMA, + errors=errors, + ) diff --git a/custom_components/denon_bt/const.py b/custom_components/denon_bt/const.py new file mode 100644 index 0000000..96e8877 --- /dev/null +++ b/custom_components/denon_bt/const.py @@ -0,0 +1,37 @@ +"""Constants for the Denon AVR Bluetooth integration.""" +from __future__ import annotations + +from typing import Final + +DOMAIN: Final = "denon_bt" + +# Bluetooth RFCOMM +RFCOMM_CHANNEL: Final = 2 +CONNECT_TIMEOUT: Final = 10 +RECONNECT_INTERVAL: Final = 30 # max seconds between reconnect attempts + +# Volume: raw byte / 2 = display value. 98 raw = display 49 (comfortable max). +VOLUME_PRACTICAL_MAX: Final = 98 + +# ── Protocol commands (Phone → AVR) ───────────────────────────────────────── + +CMD_VOLUME_UP: Final = bytes([0x41, 0x54, 0x07, 0x00, 0x00, 0x00]) +CMD_VOLUME_DOWN: Final = bytes([0x41, 0x54, 0x07, 0x01, 0x00, 0x00]) +CMD_GET_SOURCES: Final = bytes([0x41, 0x54, 0x00, 0x04, 0x00, 0x00]) +CMD_GET_STATUS: Final = bytes([0x41, 0x54, 0x00, 0x06, 0x00, 0x00]) +CMD_MUTE_ON: Final = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x01, 0xFE]) +CMD_MUTE_OFF: Final = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x00, 0xFF]) +CMD_POWER_OFF: Final = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x00, 0xFF]) +CMD_POWER_ON: Final = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x01, 0xFE]) + +# ── Known input source byte codes ─────────────────────────────────────────── + +DEFAULT_SOURCES: Final = { + 0x40: "Bluetooth", + 0x55: "Media Player", + 0x52: "CBL/SAT", + 0x53: "DVD", + 0x54: "Blu-ray", + 0x56: "Game", + 0x57: "AUX", +} diff --git a/custom_components/denon_bt/manifest.json b/custom_components/denon_bt/manifest.json new file mode 100644 index 0000000..a17a4d7 --- /dev/null +++ b/custom_components/denon_bt/manifest.json @@ -0,0 +1,10 @@ +{ + "domain": "denon_bt", + "name": "Denon AVR Bluetooth", + "codeowners": [], + "config_flow": true, + "documentation": "https://github.com/ckoch/denon-bluetooth", + "iot_class": "local_push", + "requirements": [], + "version": "1.0.0" +} diff --git a/custom_components/denon_bt/media_player.py b/custom_components/denon_bt/media_player.py new file mode 100644 index 0000000..7a48490 --- /dev/null +++ b/custom_components/denon_bt/media_player.py @@ -0,0 +1,221 @@ +"""Media player entity for Denon AVR Bluetooth.""" +from __future__ import annotations + +import logging +from typing import Any + +from homeassistant.components.media_player import ( + MediaPlayerDeviceClass, + MediaPlayerEntity, + MediaPlayerEntityFeature, + MediaPlayerState, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.event import async_call_later + +from .const import ( + CMD_MUTE_OFF, + CMD_MUTE_ON, + CMD_POWER_OFF, + CMD_POWER_ON, + CMD_VOLUME_DOWN, + CMD_VOLUME_UP, + DOMAIN, + RECONNECT_INTERVAL, + VOLUME_PRACTICAL_MAX, +) +from .protocol import DenonBTClient, DenonState, cmd_select_input + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up the Denon BT media player from a config entry.""" + mac: str = entry.data["mac"] + name: str = entry.data.get("name", "Denon AVR") + async_add_entities([DenonBTMediaPlayer(hass, entry, mac, name)]) + + +class DenonBTMediaPlayer(MediaPlayerEntity): + """Representation of a Denon AVR-S540BT over Bluetooth RFCOMM.""" + + _attr_device_class = MediaPlayerDeviceClass.RECEIVER + _attr_has_entity_name = True + _attr_name = None + _attr_supported_features = ( + MediaPlayerEntityFeature.VOLUME_STEP + | MediaPlayerEntityFeature.VOLUME_MUTE + | MediaPlayerEntityFeature.TURN_ON + | MediaPlayerEntityFeature.TURN_OFF + | MediaPlayerEntityFeature.SELECT_SOURCE + ) + + def __init__( + self, + hass: HomeAssistant, + entry: ConfigEntry, + mac: str, + name: str, + ) -> None: + self._hass = hass + self._mac = mac + + self._attr_unique_id = mac.replace(":", "").lower() + self._attr_device_info = { + "identifiers": {(DOMAIN, mac)}, + "name": name, + "manufacturer": "Denon", + "model": "AVR-S540BT", + "connections": {("bluetooth", mac)}, + } + + self._client: DenonBTClient | None = None + self._state = DenonState() + self._reconnect_cancel: callback | None = None + self._reconnect_attempts = 0 + + # ── Lifecycle ──────────────────────────────────────────────────────────── + + async def async_added_to_hass(self) -> None: + self._client = DenonBTClient( + self._hass, self._mac, self._handle_state_update + ) + await self._async_try_connect() + + async def async_will_remove_from_hass(self) -> None: + self._cancel_reconnect() + if self._client: + await self._client.async_disconnect() + + # ── State callback from protocol client ────────────────────────────────── + + @callback + def _handle_state_update(self, state: DenonState) -> None: + """Called on the event loop when the protocol client reports a change.""" + self._state = state + self._attr_available = self._client.connected if self._client else False + + if not self._attr_available: + self._schedule_reconnect() + else: + self._reconnect_attempts = 0 + self._cancel_reconnect() + + self.async_write_ha_state() + + # ── Connection / reconnect ─────────────────────────────────────────────── + + async def _async_try_connect(self) -> None: + if self._client and await self._client.async_connect(): + self._attr_available = True + self._reconnect_attempts = 0 + _LOGGER.info("Connected to Denon AVR at %s", self._mac) + else: + self._attr_available = False + self._schedule_reconnect() + _LOGGER.warning( + "Could not connect to Denon AVR at %s", self._mac + ) + + def _schedule_reconnect(self) -> None: + if self._reconnect_cancel is not None: + return + delay = min(RECONNECT_INTERVAL, 5 * (self._reconnect_attempts + 1)) + self._reconnect_cancel = async_call_later( + self._hass, delay, self._async_reconnect + ) + self._reconnect_attempts += 1 + _LOGGER.debug( + "Reconnect to %s in %ds (attempt %d)", + self._mac, + delay, + self._reconnect_attempts, + ) + + def _cancel_reconnect(self) -> None: + if self._reconnect_cancel: + self._reconnect_cancel() + self._reconnect_cancel = None + + async def _async_reconnect(self, _now: Any = None) -> None: + self._reconnect_cancel = None + if self._client: + await self._client.async_disconnect() + await self._async_try_connect() + + # ── State properties ───────────────────────────────────────────────────── + + @property + def state(self) -> MediaPlayerState: + if not self._attr_available: + return MediaPlayerState.OFF + if self._state.power: + return MediaPlayerState.ON + return MediaPlayerState.OFF + + @property + def volume_level(self) -> float | None: + if self._state.volume_raw is None: + return None + return min(1.0, self._state.volume_raw / VOLUME_PRACTICAL_MAX) + + @property + def is_volume_muted(self) -> bool | None: + return self._state.muted + + @property + def source(self) -> str | None: + return self._state.source + + @property + def source_list(self) -> list[str]: + return list(self._state.sources.values()) + + @property + def extra_state_attributes(self) -> dict[str, Any]: + attrs: dict[str, Any] = {} + if self._state.sound_mode: + attrs["sound_mode"] = self._state.sound_mode + if self._state.volume_raw is not None: + attrs["volume_raw"] = self._state.volume_raw + attrs["volume_display"] = self._state.volume_raw / 2.0 + return attrs + + # ── Commands ───────────────────────────────────────────────────────────── + + async def async_turn_on(self) -> None: + if self._client: + if not self._client.connected: + await self._async_try_connect() + await self._client.async_send(CMD_POWER_ON) + + async def async_turn_off(self) -> None: + if self._client: + await self._client.async_send(CMD_POWER_OFF) + + async def async_volume_up(self) -> None: + if self._client: + await self._client.async_send(CMD_VOLUME_UP) + + async def async_volume_down(self) -> None: + if self._client: + await self._client.async_send(CMD_VOLUME_DOWN) + + async def async_mute_volume(self, mute: bool) -> None: + if self._client: + await self._client.async_send(CMD_MUTE_ON if mute else CMD_MUTE_OFF) + + async def async_select_source(self, source: str) -> None: + if not self._client: + return + for code, name in self._state.sources.items(): + if name == source: + await self._client.async_send(cmd_select_input(code)) + return + _LOGGER.warning("Unknown source: %s", source) diff --git a/custom_components/denon_bt/protocol.py b/custom_components/denon_bt/protocol.py new file mode 100644 index 0000000..59c319e --- /dev/null +++ b/custom_components/denon_bt/protocol.py @@ -0,0 +1,359 @@ +"""Denon AVR-S540BT Bluetooth RFCOMM protocol client.""" +from __future__ import annotations + +import logging +import socket +import threading +import time +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from .const import ( + CMD_GET_SOURCES, + CMD_GET_STATUS, + CONNECT_TIMEOUT, + DEFAULT_SOURCES, + RFCOMM_CHANNEL, +) + +if TYPE_CHECKING: + from homeassistant.core import HomeAssistant + +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class DenonState: + """Current state of the Denon AVR.""" + + power: bool = False + volume_raw: int | None = None + muted: bool = False + source: str | None = None + source_code: int | None = None + sound_mode: str | None = None + sources: dict[int, str] = field(default_factory=lambda: dict(DEFAULT_SOURCES)) + + +def cmd_select_input(source_byte: int) -> bytes: + """Build input selection command.""" + return bytes([0x41, 0x54, 0x00, 0x07, 0x01, source_byte, 0x00]) + + +class DenonBTClient: + """Async-compatible Denon Bluetooth RFCOMM client for Home Assistant.""" + + def __init__( + self, + hass: HomeAssistant, + mac: str, + state_callback: Callable[[DenonState], None], + ) -> None: + self._hass = hass + self._mac = mac + self._state_callback = state_callback + + self._sock: socket.socket | None = None + self._connected = False + self._stop_event = threading.Event() + self._recv_thread: threading.Thread | None = None + self._state = DenonState() + self._lock = threading.Lock() + + @property + def connected(self) -> bool: + return self._connected + + @property + def state(self) -> DenonState: + return self._state + + # ── Connection ─────────────────────────────────────────────────────────── + + async def async_connect(self) -> bool: + """Connect to the AVR (blocking work runs in executor).""" + try: + await self._hass.async_add_executor_job(self._blocking_connect) + return True + except Exception as err: + _LOGGER.error("Connection to %s failed: %s", self._mac, err) + return False + + def _blocking_connect(self) -> None: + self._stop_event.clear() + sock = socket.socket( + socket.AF_BLUETOOTH, + socket.SOCK_STREAM, + socket.BTPROTO_RFCOMM, + ) + sock.settimeout(CONNECT_TIMEOUT) + sock.connect((self._mac, RFCOMM_CHANNEL)) + sock.settimeout(None) + + self._sock = sock + self._connected = True + + self._recv_thread = threading.Thread( + target=self._recv_loop, + daemon=True, + name=f"denon_bt_recv_{self._mac}", + ) + self._recv_thread.start() + + # Give the AVR a moment, then request initial state. + time.sleep(0.3) + self._blocking_send(CMD_GET_SOURCES) + self._blocking_send(CMD_GET_STATUS) + + async def async_disconnect(self) -> None: + await self._hass.async_add_executor_job(self._blocking_disconnect) + + def _blocking_disconnect(self) -> None: + self._stop_event.set() + self._connected = False + if self._sock: + try: + self._sock.close() + except Exception: + pass + self._sock = None + + # ── Send ───────────────────────────────────────────────────────────────── + + async def async_send(self, data: bytes) -> bool: + if not self._connected: + return False + try: + await self._hass.async_add_executor_job(self._blocking_send, data) + return True + except Exception as err: + _LOGGER.error("Send to %s failed: %s", self._mac, err) + await self.async_disconnect() + return False + + def _blocking_send(self, data: bytes) -> None: + if self._sock: + self._sock.sendall(data) + + # ── Receive loop ───────────────────────────────────────────────────────── + + def _recv_loop(self) -> None: + buf = b"" + while not self._stop_event.is_set() and self._sock: + try: + chunk = self._sock.recv(256) + if not chunk: + break + buf += chunk + + while len(buf) >= 4: + idx = buf.find(b"AT") + if idx == -1: + buf = b"" + break + if idx > 0: + buf = buf[idx:] + if len(buf) < 4: + break + + pkt_len = _packet_length(buf) + if pkt_len is None or len(buf) < pkt_len: + break + + pkt = buf[:pkt_len] + buf = buf[pkt_len:] + self._handle_packet(pkt) + + except Exception as err: + _LOGGER.debug("Recv loop ended: %s", err) + break + + self._connected = False + self._notify_state_change() + + # ── Packet handling ────────────────────────────────────────────────────── + + def _handle_packet(self, data: bytes) -> None: + if len(data) < 4: + return + + cat, typ = data[2], data[3] + changed = False + + # Volume report: AT 07 02 03 C5 [vol] 00 [chk] + if cat == 0x07 and typ == 0x02 and len(data) >= 7: + if data[4] == 0x03 and data[5] == 0xC5: + with self._lock: + self._state.volume_raw = data[6] + self._state.power = True + changed = True + + # Mute state: AT 07 1D 01 [state] [chk] + elif cat == 0x07 and typ == 0x1D and len(data) >= 6: + with self._lock: + self._state.muted = data[4] == 0x01 and data[5] == 0x01 + changed = True + + # Sound mode: AT 07 1E 15 00 [20 bytes text] [chk] + elif cat == 0x07 and typ == 0x1E and len(data) >= 6: + text = bytes(b for b in data[5:] if 32 <= b < 127).decode( + "ascii", errors="replace" + ).strip() + if text: + with self._lock: + self._state.sound_mode = text + changed = True + + # Input sources list: AT 00 04 0A 07 ... + elif cat == 0x00 and typ == 0x04 and len(data) >= 8: + discovered: dict[int, str] = {} + for b in data[4:]: + if b in DEFAULT_SOURCES: + discovered[b] = DEFAULT_SOURCES[b] + elif 0x40 <= b <= 0x7F: + discovered[b] = f"Input {b:02X}" + if discovered: + with self._lock: + self._state.sources = discovered + changed = True + + # Current input: AT 00 07 02 [src] 00 [chk] + elif cat == 0x00 and typ == 0x07 and len(data) >= 6 and data[4] == 0x02: + code = data[5] + with self._lock: + self._state.source_code = code + self._state.source = self._state.sources.get( + code, f"Input {code:02X}" + ) + changed = True + + # Power echo: AT 00 0A 01 [state] [chk] + elif cat == 0x00 and typ == 0x0A and len(data) >= 6: + with self._lock: + self._state.power = data[4] == 0x01 and data[5] == 0x01 + changed = True + + # Power ack (off): AT 07 0B 01 00 FF + elif cat == 0x07 and typ == 0x0B and len(data) >= 6: + if data[4] == 0x01 and data[5] == 0x00: + with self._lock: + self._state.power = False + changed = True + + if changed: + self._notify_state_change() + + def _notify_state_change(self) -> None: + if self._hass and self._state_callback: + self._hass.loop.call_soon_threadsafe( + self._state_callback, self._state + ) + + +# ── Packet length table (from denon_remote.py) ────────────────────────────── + + +def _packet_length(buf: bytes) -> int | None: + """Determine expected packet length from the first bytes.""" + if len(buf) < 4: + return None + cat = buf[2] + typ = buf[3] + + if cat == 0x07: + if typ in (0x00, 0x01): + return 6 + if typ == 0x02: + return 9 + if typ == 0x03: + return 6 + if typ == 0x1E: + return 27 + if typ in (0x0B, 0x1D, 0x25): + return 7 + if typ == 0x31: + return 14 + + if cat == 0x00: + if typ == 0x04: + if len(buf) > 4 and buf[4] == 0x00: + return 6 + return 16 + if typ == 0x06: + if len(buf) > 4 and buf[4] == 0x00: + return 6 + return 75 + if typ == 0x07: + if len(buf) > 4 and buf[4] == 0x01: + return 7 + return 8 + if typ == 0x09: + return 7 + if typ == 0x0A: + return 7 + if typ == 0x0B: + return 7 + if typ == 0x0D: + return 21 + if typ == 0x0E: + return 16 + if typ == 0x13: + return 7 + + if cat == 0x0D: + if typ == 0x08: + return 13 + if typ == 0x0D: + return 6 + + if cat == 0x81: + if typ == 0x04: + return 7 + if typ == 0x32: + return 119 + if typ == 0x40: + return 7 + + if cat == 0x82: + if typ == 0x31: + return 14 + + # Fallback: next AT header or 32 bytes max + next_at = buf.find(b"AT", 2) + if next_at > 0: + return next_at + return min(32, len(buf)) + + +# ── D-Bus device discovery ─────────────────────────────────────────────────── + + +def find_denon_devices() -> list[dict[str, str | bool]]: + """Query BlueZ D-Bus for paired devices with 'DENON' in the name.""" + try: + import dbus # noqa: PLC0415 + + bus = dbus.SystemBus() + manager = dbus.Interface( + bus.get_object("org.bluez", "/"), + "org.freedesktop.DBus.ObjectManager", + ) + objects = manager.GetManagedObjects() + results: list[dict[str, str | bool]] = [] + for _path, interfaces in objects.items(): + if "org.bluez.Device1" in interfaces: + dev = interfaces["org.bluez.Device1"] + name = str(dev.get("Name", "")) + if "DENON" in name.upper(): + results.append( + { + "mac": str(dev.get("Address")), + "name": name, + "paired": bool(dev.get("Paired", False)), + } + ) + return results + except Exception as err: + _LOGGER.debug("D-Bus discovery failed: %s", err) + return [] diff --git a/custom_components/denon_bt/translations/en.json b/custom_components/denon_bt/translations/en.json new file mode 100644 index 0000000..c1a1061 --- /dev/null +++ b/custom_components/denon_bt/translations/en.json @@ -0,0 +1,27 @@ +{ + "config": { + "step": { + "select": { + "title": "Select Denon AVR", + "description": "Select a discovered Denon device, or enter a MAC address manually.", + "data": { + "device": "Device" + } + }, + "manual": { + "title": "Manual Configuration", + "description": "Enter the Bluetooth MAC address of your Denon AVR receiver.", + "data": { + "mac": "MAC Address", + "name": "Device Name" + } + } + }, + "error": { + "invalid_mac": "Invalid MAC address. Use the format XX:XX:XX:XX:XX:XX." + }, + "abort": { + "already_configured": "This device is already configured." + } + } +} diff --git a/denon_remote.py b/denon_remote.py new file mode 100644 index 0000000..2fd7e5c --- /dev/null +++ b/denon_remote.py @@ -0,0 +1,858 @@ +#!/usr/bin/env python3 +""" +Denon AVR-S540BT Bluetooth Remote — Prototype +Uses RFCOMM channel 2 (BT SPP 2) as identified from btsnoop capture. +Run on Linux with: python3 denon_remote.py +""" + +import socket +import threading +import tkinter as tk +from tkinter import font as tkfont +import queue +import struct +import time + +try: + import dbus + HAS_DBUS = True +except ImportError: + HAS_DBUS = False + +# ─── Bluetooth device discovery ────────────────────────────────────────────── + +def find_denon_devices(): + """Query BlueZ D-Bus for paired devices with 'DENON' in the name.""" + if not HAS_DBUS: + return [] + try: + bus = dbus.SystemBus() + manager = dbus.Interface( + bus.get_object("org.bluez", "/"), + "org.freedesktop.DBus.ObjectManager" + ) + objects = manager.GetManagedObjects() + results = [] + for path, interfaces in objects.items(): + if "org.bluez.Device1" in interfaces: + dev = interfaces["org.bluez.Device1"] + name = str(dev.get("Name", "")) + if "DENON" in name.upper(): + results.append({ + "mac": str(dev.get("Address")), + "name": name, + "paired": bool(dev.get("Paired", False)), + }) + return results + except Exception: + return [] + +def find_bt_adapter(): + """Return the D-Bus object path of the first BlueZ adapter, or None.""" + if not HAS_DBUS: + return None + try: + bus = dbus.SystemBus() + manager = dbus.Interface( + bus.get_object("org.bluez", "/"), + "org.freedesktop.DBus.ObjectManager" + ) + objects = manager.GetManagedObjects() + for path, interfaces in objects.items(): + if "org.bluez.Adapter1" in interfaces: + return str(path) + except Exception: + pass + return None + +def discover_denon_devices(duration=8): + """Run BT discovery for `duration` seconds; return unpaired Denon devices. + + Returns list of dicts: {mac, name, path} where path is the D-Bus object path. + Must be called from a worker thread (blocks for `duration` seconds). + """ + if not HAS_DBUS: + return [] + adapter_path = find_bt_adapter() + if adapter_path is None: + return [] + try: + bus = dbus.SystemBus() + adapter = dbus.Interface( + bus.get_object("org.bluez", adapter_path), + "org.bluez.Adapter1" + ) + try: + adapter.StartDiscovery() + except dbus.exceptions.DBusException as e: + if "InProgress" not in str(e): + return [] + + time.sleep(duration) + + try: + adapter.StopDiscovery() + except Exception: + pass + + manager = dbus.Interface( + bus.get_object("org.bluez", "/"), + "org.freedesktop.DBus.ObjectManager" + ) + objects = manager.GetManagedObjects() + results = [] + for path, interfaces in objects.items(): + if "org.bluez.Device1" in interfaces: + dev = interfaces["org.bluez.Device1"] + name = str(dev.get("Name", "")) + paired = bool(dev.get("Paired", False)) + if "DENON" in name.upper() and not paired: + results.append({ + "mac": str(dev.get("Address")), + "name": name, + "path": str(path), + }) + return results + except Exception: + return [] + +def pair_and_trust_device(device_path): + """Pair and trust a BlueZ device. Blocks until pairing completes. + + Returns (True, "") on success, (False, error_message) on failure. + """ + if not HAS_DBUS: + return (False, "D-Bus not available") + try: + bus = dbus.SystemBus() + device = dbus.Interface( + bus.get_object("org.bluez", device_path), + "org.bluez.Device1" + ) + device.Pair() + + props = dbus.Interface( + bus.get_object("org.bluez", device_path), + "org.freedesktop.DBus.Properties" + ) + props.Set("org.bluez.Device1", "Trusted", dbus.Boolean(True)) + + return (True, "") + except dbus.exceptions.DBusException as e: + if "AlreadyExists" in str(e): + return (True, "") + return (False, str(e)) + except Exception as e: + return (False, str(e)) + +# ─── Protocol constants ─────────────────────────────────────────────────────── + +RFCOMM_CHANNEL = 2 + +# Commands (PHONE -> AVR) +CMD_VOLUME_UP = bytes([0x41, 0x54, 0x07, 0x00, 0x00, 0x00]) +CMD_VOLUME_DOWN = bytes([0x41, 0x54, 0x07, 0x01, 0x00, 0x00]) +CMD_GET_SOURCES = bytes([0x41, 0x54, 0x00, 0x04, 0x00, 0x00]) +CMD_GET_STATUS = bytes([0x41, 0x54, 0x00, 0x06, 0x00, 0x00]) +CMD_MUTE_ON = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x01, 0xFE]) +CMD_MUTE_OFF = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x00, 0xFF]) +CMD_POWER_OFF = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x00, 0xFF]) +CMD_POWER_ON = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x01, 0xFE]) + +def cmd_select_input(source_byte): + # AT 00 07 01 [source] [checksum_placeholder] + return bytes([0x41, 0x54, 0x00, 0x07, 0x01, source_byte, 0x00]) + +# Known input source codes observed in log (R,S,T,V,W bytes) +# Exact label mapping is approximate — adjust after testing +INPUT_SOURCES = { + 0x40: "Bluetooth", + 0x55: "Media Player", # 'U' — observed in source list, label approximate + 0x52: "CBL/SAT", # 'R' + 0x53: "DVD", # 'S' + 0x54: "Blu-ray", # 'T' + 0x56: "Game", # 'V' + 0x57: "AUX", # 'W' +} + +# ─── Bluetooth connection ───────────────────────────────────────────────────── + +class DenonBT: + def __init__(self, on_message, on_status): + self.sock = None + self.connected = False + self.on_message = on_message # callback(bytes) + self.on_status = on_status # callback(str) + self._recv_thread = None + self._stop = threading.Event() + + def connect(self, mac): + self._stop.clear() + try: + self.on_status("Connecting…") + s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) + s.settimeout(10) + s.connect((mac, RFCOMM_CHANNEL)) + s.settimeout(None) + self.sock = s + self.connected = True + self.on_status(f"Connected to {mac}") + self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True) + self._recv_thread.start() + # Request initial state + time.sleep(0.3) + self.send(CMD_GET_SOURCES) + self.send(CMD_GET_STATUS) + except Exception as e: + self.connected = False + self.on_status(f"Connection failed: {e}") + + def disconnect(self): + self._stop.set() + self.connected = False + if self.sock: + try: + self.sock.close() + except Exception: + pass + self.sock = None + self.on_status("Disconnected") + + def send(self, data: bytes): + if not self.connected or not self.sock: + self.on_status("Not connected") + return + try: + self.sock.sendall(data) + self.on_message(data, direction="SENT") + except Exception as e: + self.on_status(f"Send error: {e}") + self.disconnect() + + def _recv_loop(self): + buf = b"" + while not self._stop.is_set() and self.sock: + try: + chunk = self.sock.recv(256) + if not chunk: + break + buf += chunk + # Process all complete AT packets in buffer + while len(buf) >= 4: + idx = buf.find(b'AT') + if idx == -1: + buf = b"" + break + if idx > 0: + buf = buf[idx:] + # Minimum packet is 4 bytes: AT + 2 command bytes + # Use length byte at offset 3 if available, else wait for more + if len(buf) < 4: + break + pkt_len = self._packet_length(buf) + if pkt_len is None or len(buf) < pkt_len: + break + pkt = buf[:pkt_len] + buf = buf[pkt_len:] + self.on_message(pkt, direction="RECV") + except Exception: + break + self.on_status("Disconnected") + + def _packet_length(self, buf): + """Best-effort length detection for AT protocol packets.""" + if len(buf) < 4: + return None + cat = buf[2] + typ = buf[3] + # Known fixed-length packets + if cat == 0x07: + if typ in (0x00, 0x01): return 6 # vol up/down command + if typ == 0x02: return 9 # vol report + if typ == 0x03: return 6 # unknown init cmd + if typ == 0x1e: return 27 # sound mode (AT + cmd + 0x15 + 0x00 + 20 text + chk) + if typ in (0x0b, 0x1d, 0x25): return 7 + if typ == 0x31: return 14 # firmware/version string + if cat == 0x00: + if typ == 0x04: + if len(buf) > 4 and buf[4] == 0x00: + return 6 # request + return 16 # response with sources + if typ == 0x06: + if len(buf) > 4 and buf[4] == 0x00: + return 6 # request + return 75 # status response + if typ == 0x07: + if len(buf) > 4 and buf[4] == 0x01: + return 7 # input select command + return 8 # input report + if typ == 0x09: return 7 + if typ == 0x0a: return 7 + if typ == 0x0b: return 7 + if typ == 0x0d: return 21 # device info with MAC + if typ == 0x0e: return 16 # device name + if typ == 0x13: return 7 + if cat == 0x0d: + if typ == 0x08: return 13 + if typ == 0x0d: return 6 + if cat == 0x81: + if typ == 0x04: return 7 + if typ == 0x32: return 119 # large volume table + if typ == 0x40: return 7 + if cat == 0x82: + if typ == 0x31: return 14 + # Fallback: read until next AT or 32 bytes + next_at = buf.find(b'AT', 2) + if next_at > 0: + return next_at + return min(32, len(buf)) + + +# ─── GUI ───────────────────────────────────────────────────────────────────── + +BG = "#0d0d0d" +PANEL = "#161616" +BORDER = "#2a2a2a" +ACCENT = "#e8c97a" # warm gold +ACCENT2 = "#4fc3f7" # cool blue +TEXT = "#f0ede6" +MUTED = "#666" +RED = "#e05252" +GREEN = "#4caf50" + +class DenonRemoteApp(tk.Tk): + def __init__(self): + super().__init__() + self.title("Denon AVR-S540BT Remote") + self.configure(bg=BG) + self.resizable(False, False) + + self._msg_queue = queue.Queue() + self._vol = tk.StringVar(value="—") + self._vol_raw = None # raw volume byte (0x00-0xFF) + self._muted = False + self._sound_mode = tk.StringVar(value="—") + self._status = tk.StringVar(value="Disconnected") + self._sources = {} # byte -> label + self._cur_input = None + self._devices = [] # list of dicts from find_denon_devices() + self._selected_device = None # index into _devices + self._unpaired_devices = [] # list of dicts from discover_denon_devices() + self._discovering = False + self._pairing = False + + self.bt = DenonBT( + on_message=self._on_bt_message, + on_status=self._on_bt_status, + ) + + self._build_ui() + self._poll_queue() + self._scan_devices() + + # ── UI construction ─────────────────────────────────────────────────────── + + def _build_ui(self): + self._fonts() + + # ── Header ──────────────────────────────────────────────────────────── + hdr = tk.Frame(self, bg=BG, pady=18) + hdr.pack(fill="x", padx=24) + + tk.Label(hdr, text="DENON", font=self.f_brand, + bg=BG, fg=ACCENT).pack(side="left") + tk.Label(hdr, text=" AVR-S540BT", font=self.f_model, + bg=BG, fg=MUTED).pack(side="left", padx=(2, 0)) + + # Status dot + self._status_dot = tk.Label(hdr, text="●", font=self.f_dot, + bg=BG, fg=RED) + self._status_dot.pack(side="right") + tk.Label(hdr, textvariable=self._status, font=self.f_small, + bg=BG, fg=MUTED).pack(side="right", padx=(0, 6)) + + self._divider() + + # ── Connection panel ────────────────────────────────────────────────── + conn = tk.Frame(self, bg=PANEL, padx=20, pady=16) + conn.pack(fill="x", padx=16, pady=(12, 0)) + + tk.Label(conn, text="DEVICE", font=self.f_label, + bg=PANEL, fg=MUTED).grid(row=0, column=0, sticky="w", + columnspan=3) + + self._device_frame = tk.Frame(conn, bg=PANEL) + self._device_frame.grid(row=1, column=0, pady=(4, 0), sticky="w") + + self._device_label = tk.Label( + self._device_frame, text="Scanning…", font=self.f_mono, + bg=PANEL, fg=MUTED, anchor="w" + ) + self._device_label.pack(side="left") + + btn_row = tk.Frame(conn, bg=PANEL) + btn_row.grid(row=0, column=2, sticky="e") + + self._scan_btn = self._btn( + btn_row, "SCAN", self._scan_devices, + bg=PANEL, fg=MUTED, font=self.f_small + ) + self._scan_btn.pack(side="left", padx=(0, 4)) + + self._scan_pair_btn = self._btn( + btn_row, "SCAN & PAIR", self._start_discovery, + bg=PANEL, fg=ACCENT2, font=self.f_small + ) + self._scan_pair_btn.pack(side="left") + + self._conn_btn = self._btn( + conn, "CONNECT", self._toggle_connect, + bg=ACCENT, fg=BG, font=self.f_btn + ) + self._conn_btn.grid(row=1, column=2, padx=(10, 0), pady=(4, 0), + ipady=6, ipadx=14) + + self._unpaired_frame = tk.Frame(conn, bg=PANEL) + self._unpaired_frame.grid(row=2, column=0, columnspan=3, + pady=(8, 0), sticky="w") + + self._divider() + + # ── Power panel ────────────────────────────────────────────────── + pwr_frame = tk.Frame(self, bg=BG, pady=6) + pwr_frame.pack(fill="x", padx=24) + + tk.Label(pwr_frame, text="POWER", font=self.f_label, + bg=BG, fg=MUTED).pack(side="left") + + self._btn( + pwr_frame, "OFF", lambda: self.bt.send(CMD_POWER_OFF), + bg=PANEL, fg=RED, font=self.f_btn, + ).pack(side="right", ipady=5, ipadx=14) + + self._btn( + pwr_frame, "ON", lambda: self.bt.send(CMD_POWER_ON), + bg=PANEL, fg=GREEN, font=self.f_btn, + ).pack(side="right", padx=(0, 6), ipady=5, ipadx=14) + + self._divider() + + # ── Volume panel ────────────────────────────────────────────────────── + vol_frame = tk.Frame(self, bg=BG, pady=8) + vol_frame.pack(fill="x", padx=24) + + tk.Label(vol_frame, text="VOLUME", font=self.f_label, + bg=BG, fg=MUTED).pack(anchor="w") + + vol_ctrl = tk.Frame(vol_frame, bg=BG) + vol_ctrl.pack(fill="x", pady=(8, 0)) + + self._vol_down_btn = self._btn( + vol_ctrl, "▼", lambda: self.bt.send(CMD_VOLUME_DOWN), + bg=PANEL, fg=TEXT, font=self.f_arrow, + width=4, activebackground=BORDER + ) + self._vol_down_btn.pack(side="left", ipady=12) + + tk.Label(vol_ctrl, textvariable=self._vol, font=self.f_vol, + bg=BG, fg=ACCENT, width=6).pack(side="left") + + self._vol_up_btn = self._btn( + vol_ctrl, "▲", lambda: self.bt.send(CMD_VOLUME_UP), + bg=PANEL, fg=TEXT, font=self.f_arrow, + width=4, activebackground=BORDER + ) + self._vol_up_btn.pack(side="left", ipady=12) + + self._mute_btn = self._btn( + vol_ctrl, "MUTE", self._toggle_mute, + bg=PANEL, fg=MUTED, font=self.f_btn, + activebackground=RED + ) + self._mute_btn.pack(side="right", ipady=6, ipadx=14) + + self._divider() + + # ── Sound mode ──────────────────────────────────────────────────────── + sm = tk.Frame(self, bg=BG, pady=6) + sm.pack(fill="x", padx=24) + + tk.Label(sm, text="SOUND MODE", font=self.f_label, + bg=BG, fg=MUTED).pack(side="left") + tk.Label(sm, textvariable=self._sound_mode, font=self.f_value, + bg=BG, fg=ACCENT2).pack(side="right") + + self._divider() + + # ── Input sources ───────────────────────────────────────────────────── + inp = tk.Frame(self, bg=BG, pady=8) + inp.pack(fill="x", padx=24) + + tk.Label(inp, text="INPUT", font=self.f_label, + bg=BG, fg=MUTED).pack(anchor="w") + + self._input_frame = tk.Frame(inp, bg=BG) + self._input_frame.pack(fill="x", pady=(8, 0)) + + # Populated dynamically when sources are received + self._input_buttons = {} + self._render_inputs(INPUT_SOURCES) + + self._divider() + + # ── Log ─────────────────────────────────────────────────────────────── + log_hdr = tk.Frame(self, bg=BG) + log_hdr.pack(fill="x", padx=24, pady=(4, 0)) + tk.Label(log_hdr, text="PACKET LOG", font=self.f_label, + bg=BG, fg=MUTED).pack(side="left") + self._btn(log_hdr, "CLEAR", self._clear_log, + bg=BG, fg=MUTED, font=self.f_small).pack(side="right") + + self._log = tk.Text( + self, height=8, font=self.f_mono, + bg="#0a0a0a", fg="#3a7a3a", insertbackground=ACCENT, + relief="flat", bd=0, state="disabled", + highlightthickness=1, highlightbackground=BORDER, + padx=8, pady=6 + ) + self._log.pack(fill="x", padx=16, pady=(4, 16)) + self._log.tag_config("sent", foreground="#4fc3f7") + self._log.tag_config("recv", foreground="#81c784") + self._log.tag_config("info", foreground=MUTED) + + def _fonts(self): + self.f_brand = tkfont.Font(family="Courier", size=18, weight="bold") + self.f_model = tkfont.Font(family="Courier", size=12) + self.f_label = tkfont.Font(family="Courier", size=8, weight="bold") + self.f_value = tkfont.Font(family="Courier", size=12) + self.f_vol = tkfont.Font(family="Courier", size=32, weight="bold") + self.f_btn = tkfont.Font(family="Courier", size=9, weight="bold") + self.f_arrow = tkfont.Font(family="Courier", size=14, weight="bold") + self.f_mono = tkfont.Font(family="Courier", size=9) + self.f_small = tkfont.Font(family="Courier", size=8) + self.f_dot = tkfont.Font(family="Courier", size=10) + + def _btn(self, parent, text, cmd, **kw): + defaults = dict( + text=text, command=cmd, + bg=PANEL, fg=TEXT, + font=self.f_btn, + relief="flat", bd=0, + cursor="hand2", + activeforeground=ACCENT, + activebackground=BORDER, + ) + defaults.update(kw) + return tk.Button(parent, **defaults) + + def _divider(self): + tk.Frame(self, bg=BORDER, height=1).pack( + fill="x", padx=16, pady=6) + + def _render_inputs(self, sources): + for w in self._input_frame.winfo_children(): + w.destroy() + self._input_buttons = {} + for code, label in sources.items(): + btn = self._btn( + self._input_frame, label, + lambda c=code: self._select_input(c), + bg=PANEL, fg=MUTED, font=self.f_btn, + ) + btn.pack(side="left", padx=(0, 6), ipady=5, ipadx=8) + self._input_buttons[code] = btn + if not sources: + tk.Label(self._input_frame, text="No sources discovered yet", + font=self.f_small, bg=BG, fg=MUTED).pack(side="left") + + def _toggle_mute(self): + if self._muted: + self.bt.send(CMD_MUTE_OFF) + else: + self.bt.send(CMD_MUTE_ON) + + def _update_mute_ui(self): + if self._muted: + self._mute_btn.configure(bg=RED, fg=TEXT, text="MUTED") + else: + self._mute_btn.configure(bg=PANEL, fg=MUTED, text="MUTE") + + def _select_input(self, code): + self.bt.send(cmd_select_input(code)) + self._highlight_input(code) + + def _highlight_input(self, code): + self._cur_input = code + for c, btn in self._input_buttons.items(): + if c == code: + btn.configure(fg=ACCENT, bg="#252010", + highlightthickness=1, + highlightbackground=ACCENT, + highlightcolor=ACCENT) + else: + btn.configure(fg=MUTED, bg=PANEL, + highlightthickness=0) + + def _clear_log(self): + self._log.configure(state="normal") + self._log.delete("1.0", "end") + self._log.configure(state="disabled") + + # ── Device discovery ───────────────────────────────────────────────────── + + def _scan_devices(self): + self._devices = find_denon_devices() + self._selected_device = 0 if self._devices else None + self._render_device_list() + + def _render_device_list(self): + for w in self._device_frame.winfo_children(): + w.destroy() + + if not HAS_DBUS: + tk.Label(self._device_frame, + text="Install python3-dbus for auto-detection", + font=self.f_small, bg=PANEL, fg=RED).pack(side="left") + return + + if not self._devices: + tk.Label(self._device_frame, + text="No paired Denon device — use SCAN & PAIR", + font=self.f_small, bg=PANEL, fg=MUTED).pack(side="left") + return + + if len(self._devices) == 1: + dev = self._devices[0] + tk.Label(self._device_frame, + text=f"{dev['name']} {dev['mac']}", + font=self.f_mono, bg=PANEL, fg=TEXT).pack(side="left") + else: + for i, dev in enumerate(self._devices): + is_sel = (i == self._selected_device) + btn = self._btn( + self._device_frame, + f"{dev['name']}", + lambda idx=i: self._pick_device(idx), + bg="#252010" if is_sel else PANEL, + fg=ACCENT if is_sel else MUTED, + font=self.f_btn, + ) + btn.pack(side="left", padx=(0, 6), ipady=3, ipadx=6) + + def _pick_device(self, idx): + self._selected_device = idx + self._render_device_list() + + # ── Bluetooth discovery & pairing ───────────────────────────────────────── + + def _start_discovery(self): + if not HAS_DBUS: + self._log_append("Install python3-dbus for Bluetooth scanning", "info") + return + if self._discovering: + return + self._discovering = True + self._unpaired_devices = [] + self._scan_pair_btn.configure(state="disabled", text="SCANNING…") + self._render_unpaired_list() + self._log_append("Starting Bluetooth discovery…", "info") + threading.Thread(target=self._discovery_worker, daemon=True).start() + + def _discovery_worker(self): + devices = discover_denon_devices(duration=8) + self._msg_queue.put(("discovery_done", devices)) + + def _handle_discovery_done(self, devices): + self._discovering = False + self._unpaired_devices = devices + self._scan_pair_btn.configure(state="normal", text="SCAN & PAIR") + self._render_unpaired_list() + count = len(devices) + if count == 0: + self._log_append("Discovery complete — no unpaired Denon devices found", "info") + else: + self._log_append(f"Discovery complete — found {count} unpaired Denon device(s)", "info") + + def _render_unpaired_list(self): + for w in self._unpaired_frame.winfo_children(): + w.destroy() + + if self._discovering: + tk.Label(self._unpaired_frame, + text="Scanning for Denon devices…", + font=self.f_small, bg=PANEL, fg=ACCENT2).pack(anchor="w") + return + + if not self._unpaired_devices: + return + + tk.Label(self._unpaired_frame, + text="UNPAIRED DEVICES", font=self.f_label, + bg=PANEL, fg=MUTED).pack(anchor="w", pady=(4, 2)) + + for dev in self._unpaired_devices: + row = tk.Frame(self._unpaired_frame, bg=PANEL) + row.pack(fill="x", pady=1) + + tk.Label(row, text=f"{dev['name']} {dev['mac']}", + font=self.f_small, bg=PANEL, fg=MUTED).pack(side="left") + + self._btn( + row, "PAIR", + lambda d=dev: self._start_pair(d), + bg=ACCENT2, fg=BG, font=self.f_small, + ).pack(side="right", ipadx=8, ipady=2) + + def _start_pair(self, device): + if self._pairing: + return + self._pairing = True + self._log_append(f"Pairing with {device['name']} ({device['mac']})…", "info") + for w in self._unpaired_frame.winfo_children(): + for child in w.winfo_children(): + if isinstance(child, tk.Button): + child.configure(state="disabled") + threading.Thread( + target=self._pair_worker, args=(device,), daemon=True + ).start() + + def _pair_worker(self, device): + success, error = pair_and_trust_device(device["path"]) + self._msg_queue.put(("pair_done", device, success, error)) + + def _handle_pair_done(self, device, success, error): + self._pairing = False + if success: + self._log_append(f"Paired and trusted: {device['name']}", "info") + self._unpaired_devices = [ + d for d in self._unpaired_devices if d["mac"] != device["mac"] + ] + self._render_unpaired_list() + self._scan_devices() + else: + self._log_append(f"Pairing failed: {error}", "info") + self._render_unpaired_list() + + # ── Connection ──────────────────────────────────────────────────────────── + + def _toggle_connect(self): + if self.bt.connected: + self.bt.disconnect() + self._conn_btn.configure(text="CONNECT", bg=ACCENT, fg=BG) + else: + if self._selected_device is None or not self._devices: + self._log_append("No paired Denon device — use SCAN & PAIR", "info") + return + mac = self._devices[self._selected_device]["mac"] + self._conn_btn.configure(text="DISCONNECT", bg=RED, fg=TEXT) + threading.Thread( + target=self.bt.connect, args=(mac,), daemon=True + ).start() + + # ── Message handling ────────────────────────────────────────────────────── + + def _on_bt_message(self, data: bytes, direction: str): + self._msg_queue.put(("msg", data, direction)) + + def _on_bt_status(self, msg: str): + self._msg_queue.put(("status", msg)) + + def _poll_queue(self): + try: + while True: + item = self._msg_queue.get_nowait() + if item[0] == "status": + self._handle_status(item[1]) + elif item[0] == "msg": + self._handle_message(item[1], item[2]) + elif item[0] == "discovery_done": + self._handle_discovery_done(item[1]) + elif item[0] == "pair_done": + self._handle_pair_done(item[1], item[2], item[3]) + except queue.Empty: + pass + self.after(50, self._poll_queue) + + def _handle_status(self, msg: str): + self._status.set(msg) + connected = msg.startswith("Connected") + self._status_dot.configure(fg=GREEN if connected else RED) + if not connected: + self._conn_btn.configure(text="CONNECT", bg=ACCENT, fg=BG) + + def _handle_message(self, data: bytes, direction: str): + hex_str = " ".join(f"{b:02x}" for b in data) + tag = "sent" if direction == "SENT" else "recv" + arrow = "→" if direction == "SENT" else "←" + self._log_append(f"{arrow} {hex_str}", tag) + + if len(data) < 3: + return + + cat = data[2] + typ = data[3] if len(data) > 3 else None + + # Volume report: AT 07 02 03 c5 [vol] ... + if cat == 0x07 and typ == 0x02 and len(data) >= 7: + if data[4] == 0x03 and data[5] == 0xc5: + vol = data[6] + self._vol_raw = vol + # Display as dB: raw/2 gives the display value shown on the AVR + vol_db = vol / 2.0 + self._vol.set(f"{vol_db:g}") + return + + # Mute state: AT 07 1d 01 [state] ... + if cat == 0x07 and typ == 0x1d and len(data) >= 6: + self._muted = (data[4] == 0x01 and data[5] == 0x01) + self._update_mute_ui() + return + + # Sound mode: AT 07 1e ... + if cat == 0x07 and typ == 0x1e and len(data) >= 6: + text_bytes = data[5:] + text = bytes(b for b in text_bytes if 32 <= b < 127).decode("ascii", errors="replace").strip() + if text: + self._sound_mode.set(text) + return + + # Input sources: AT 00 04 0a ... + if cat == 0x00 and typ == 0x04 and direction == "RECV" and len(data) >= 8: + raw = data[4:] + discovered = {} + for b in raw: + if b in INPUT_SOURCES: + discovered[b] = INPUT_SOURCES[b] + elif 0x40 <= b <= 0x7f: + discovered[b] = f"SRC-{b:02x}" + if discovered: + self._sources = discovered + self._render_inputs(discovered) + return + + # Current input: AT 00 07 02 [source] ... + if cat == 0x00 and typ == 0x07 and direction == "RECV" and len(data) >= 5: + if data[4] == 0x02 and len(data) >= 6: + self._highlight_input(data[5]) + return + + # Device name: AT 00 0e ... + if cat == 0x00 and typ == 0x0e and direction == "RECV" and len(data) >= 6: + name_bytes = data[5:] + name = bytes(b for b in name_bytes if 32 <= b < 127).decode("ascii", errors="replace").strip() + if name: + self._log_append(f" Device: {name}", "info") + return + + def _log_append(self, text: str, tag: str = ""): + self._log.configure(state="normal") + self._log.insert("end", text + "\n", tag) + self._log.see("end") + self._log.configure(state="disabled") + + +# ─── Entry point ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + app = DenonRemoteApp() + app.mainloop() \ No newline at end of file