Initial commit

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
ckoch
2026-06-06 10:59:47 -04:00
commit a5d2865eb2
10 changed files with 1800 additions and 0 deletions
+15
View File
@@ -0,0 +1,15 @@
# Python
__pycache__/
*.py[cod]
*.egg-info/
.venv/
venv/
env/
.env
# OS / editor
.DS_Store
*.swp
*~
.idea/
.vscode/
+142
View File
@@ -0,0 +1,142 @@
# Denon AVR-S540BT Bluetooth Remote — Project Context
## What We're Building
A custom Python application to control the Denon AVR-S540BT AV receiver over
Bluetooth, replicating the functionality of the official "Denon 500 Series Remote"
Android app without relying on it.
## How We Got Here
### Discovery
The AVR-S540BT has no network (no Ethernet, no WiFi). The only remote control
interface is Bluetooth, used by Denon's own "Denon 500 Series Remote" app.
Denon does not publish the Bluetooth control protocol.
### Reverse Engineering Method
1. Enabled Bluetooth HCI snoop log on a Pixel 10 Pro XL (Android Developer
Options → Enable Bluetooth HCI snoop log)
2. Used the official Denon app to perform actions: connect, volume up/down,
input selection, etc.
3. Extracted the log via `adb bugreport` (direct `adb pull` is blocked on
Android 15) and unzipped the btsnoop_hci.log from inside the zip
4. Parsed the binary BTSnoop v1 file in Python (no external dependencies)
and identified ACL data packets containing the string "AT" (0x41 0x54)
5. Decoded the packet structure
### Protocol Details
- **Transport**: Bluetooth Classic, RFCOMM (Serial Port Profile)
- **Channel**: 2 (service name "BT SPP 2", confirmed via SDP in the log)
- **Framing**: Each packet starts with bytes `41 54` ("AT"), followed by
binary command bytes. The trailing byte in raw HCI is an RFCOMM FCS
(Frame Check Sequence) added by the kernel — NOT part of the application
payload. When using `socket.AF_BLUETOOTH / BTPROTO_RFCOMM`, the kernel
handles framing automatically; you just read/write raw application data.
### Decoded Commands
#### Phone → AVR (commands)
| Action | Bytes (hex) | Notes |
|------------------|------------------------------|-----------------------------|
| Volume Up | `41 54 07 00 00 00` | |
| Volume Down | `41 54 07 01 00 00` | |
| Mute On | `41 54 07 1D 01 01 FE` | Discrete on |
| Mute Off | `41 54 07 1D 01 00 FF` | Discrete off |
| Power Off | `41 54 00 0A 01 00 FF` | |
| Power On | `41 54 00 0A 01 01 FE` | |
| Get Input Sources| `41 54 00 04 00 00` | |
| Get Status | `41 54 00 06 00 00` | |
| Select Input | `41 54 00 07 01 [src] [chk]` | chk = 0xFF XOR src |
| Mute Toggle* | `41 54 07 25 01 20 DF` | Used in handshake, not user |
#### AVR → Phone (responses)
| Response | Bytes (hex) | Notes |
|-------------------|------------------------------------------------------|------------------------------------------|
| Volume Report | `41 54 07 02 03 C5 [vol] 00 [chk]` | vol/2 = display value, chk = 0xFF-vol+1 |
| Mute State | `41 54 07 1D 01 [state] [chk]` | state: 01=muted, 00=unmuted |
| Sound Mode | `41 54 07 1E 15 00 [20 bytes text] [chk]` | ASCII, space-padded, 27 bytes total |
| Power Ack | `41 54 07 0B 01 00 FF` | Sent by AVR on power off |
| Input Sources | `41 54 00 04 0A 07 40 81 55 82 52 53 54 56 57 B1` | 0x81/0x82 are category separators |
| Current Input | `41 54 00 07 02 [src] 00 [chk]` | |
| Power Echo | `41 54 00 0A 01 [state] [chk]` | AVR echoes power state back |
| Status Keepalive | `41 54 00 0B 01 07 F8` | Sent by AVR after power on |
| Device Name | `41 54 00 0E 0A 00 [name bytes] [chk]` | Returned " DENON AV", 16 bytes total |
| Status Response | `41 54 00 06 45 ... [75 bytes]` | Full status dump |
#### Known Input Source Byte Codes
These were observed in the log. Exact label↔code mapping is approximate and
should be confirmed by testing:
| Byte | Label |
|------|-----------|
| 0x40 | Bluetooth |
| 0x55 | Media Player | ('U' — label approximate, needs confirmation)
| 0x52 | CBL/SAT |
| 0x53 | DVD |
| 0x54 | Blu-ray |
| 0x56 | Game |
| 0x57 | AUX |
### Second Capture (Session 2)
Actions performed: volume 20.5→21, mute on, mute off, volume down, mute on,
mute off, power off, power on, power off, power on.
**New findings:**
- **Mute** is discrete on/off (07 1D), not a toggle. The 07 25 command seen
in the first capture is part of the connection handshake, not user mute.
- **Power** uses 00 0A with 01 00 = off, 01 01 = on.
- After power on, the AVR sends a status keepalive (00 0B 01 07) and the
Denon app replays the mute-toggle handshake (07 25) before resuming.
- **Volume display mapping**: raw byte / 2 = display value (e.g., 0x29=41
→ display "20.5", 0x2a=42 → display "21").
- Source byte **0x55** ("Media Player"?) discovered in sources list.
- **0x81 / 0x82** in the sources response are category separators, not sources.
### Commands Not Yet Captured
The following still need to be reverse engineered:
- Individual sound mode selection (only the status report was seen)
- Tuner controls (band, tune up/down, preset)
- Quick Select buttons
## Current Codebase
### denon_remote.py
A Python 3 tkinter application (stdlib + optional python3-dbus for device
auto-detection).
**Key design decisions:**
- Uses `socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM`
(Linux built-in, no PyBluez needed)
- Receive loop runs in a background daemon thread
- All UI updates routed through a `queue.Queue` polled with `after(50,...)`
to keep tkinter thread-safe
- Packet length detection is best-effort heuristic based on known packet types;
unknown packets fall back to reading until the next `AT` header or 32 bytes
- Input source buttons are rendered dynamically from the AVR's own source list
response rather than being hard-coded
- Auto-detects paired Denon devices via BlueZ D-Bus (falls back gracefully
if python3-dbus is unavailable)
- Power on/off and discrete mute on/off commands implemented
- Mute state tracked and reflected in UI (button changes to red "MUTED")
- Volume displayed as the AVR's display value (raw_byte / 2)
**Known limitations / things to improve:**
- Packet length detection covers all observed packet types but unknown types
may still be mis-framed
- Input source label mapping is approximate (especially 0x55 "Media Player")
- No reconnect logic if the connection drops
- Sound mode selection not implemented (only displays current mode)
- The UI aesthetic uses Courier as a monospace font — works everywhere but
could use a proper monospace like JetBrains Mono or similar if available
## Environment
- **Dev machine**: Linux (Ubuntu/Debian), user ckoch on host Shinobu
- **Receiver**: Denon AVR-S540BT
- **Test phone**: Pixel 10 Pro XL (Android 15)
- **Tools used**: adb (34.0.4), Wireshark 4.2.2, Python 3.12
## Next Steps
1. Test power on/off and mute on/off against the real receiver
2. Confirm the 0x55 source label (likely "Media Player" or "USB")
3. Do a third btsnoop capture targeting: sound mode switching, tuner, quick
select
4. Add reconnect logic for dropped connections
5. Build out a more complete and polished application
+25
View File
@@ -0,0 +1,25 @@
"""Denon AVR Bluetooth integration for Home Assistant."""
from __future__ import annotations
import logging
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from .const import DOMAIN # noqa: F401
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.MEDIA_PLAYER]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Denon BT from a config entry."""
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
+106
View File
@@ -0,0 +1,106 @@
"""Config flow for Denon AVR Bluetooth integration."""
from __future__ import annotations
import logging
import re
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.data_entry_flow import FlowResult
from .const import DOMAIN
from .protocol import find_denon_devices
_LOGGER = logging.getLogger(__name__)
_MAC_RE = re.compile(r"^([0-9A-F]{2}:){5}[0-9A-F]{2}$", re.IGNORECASE)
MANUAL_SCHEMA = vol.Schema(
{
vol.Required("mac"): str,
vol.Optional("name", default="Denon AVR-S540BT"): str,
}
)
class DenonBTConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Denon AVR Bluetooth."""
VERSION = 1
def __init__(self) -> None:
self._discovered: list[dict[str, Any]] = []
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Entry point — try D-Bus discovery, fall back to manual."""
self._discovered = await self.hass.async_add_executor_job(
find_denon_devices
)
if self._discovered:
return await self.async_step_select()
return await self.async_step_manual()
# ── Device selection ─────────────────────────────────────────────────────
async def async_step_select(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Let the user pick a discovered device or go manual."""
if user_input is not None:
chosen = user_input["device"]
if chosen == "_manual":
return await self.async_step_manual()
for dev in self._discovered:
if dev["mac"] == chosen:
await self.async_set_unique_id(dev["mac"].upper())
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=dev["name"],
data={"mac": dev["mac"], "name": dev["name"]},
)
options = {
dev["mac"]: f"{dev['name']} ({dev['mac']})"
for dev in self._discovered
}
options["_manual"] = "Enter MAC address manually\u2026"
return self.async_show_form(
step_id="select",
data_schema=vol.Schema(
{vol.Required("device"): vol.In(options)}
),
)
# ── Manual MAC entry ─────────────────────────────────────────────────────
async def async_step_manual(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle manual MAC address entry."""
errors: dict[str, str] = {}
if user_input is not None:
mac = user_input["mac"].strip().upper()
name = user_input.get("name", "Denon AVR")
if not _MAC_RE.match(mac):
errors["base"] = "invalid_mac"
else:
await self.async_set_unique_id(mac)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=name,
data={"mac": mac, "name": name},
)
return self.async_show_form(
step_id="manual",
data_schema=MANUAL_SCHEMA,
errors=errors,
)
+37
View File
@@ -0,0 +1,37 @@
"""Constants for the Denon AVR Bluetooth integration."""
from __future__ import annotations
from typing import Final
DOMAIN: Final = "denon_bt"
# Bluetooth RFCOMM
RFCOMM_CHANNEL: Final = 2
CONNECT_TIMEOUT: Final = 10
RECONNECT_INTERVAL: Final = 30 # max seconds between reconnect attempts
# Volume: raw byte / 2 = display value. 98 raw = display 49 (comfortable max).
VOLUME_PRACTICAL_MAX: Final = 98
# ── Protocol commands (Phone → AVR) ─────────────────────────────────────────
CMD_VOLUME_UP: Final = bytes([0x41, 0x54, 0x07, 0x00, 0x00, 0x00])
CMD_VOLUME_DOWN: Final = bytes([0x41, 0x54, 0x07, 0x01, 0x00, 0x00])
CMD_GET_SOURCES: Final = bytes([0x41, 0x54, 0x00, 0x04, 0x00, 0x00])
CMD_GET_STATUS: Final = bytes([0x41, 0x54, 0x00, 0x06, 0x00, 0x00])
CMD_MUTE_ON: Final = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x01, 0xFE])
CMD_MUTE_OFF: Final = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x00, 0xFF])
CMD_POWER_OFF: Final = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x00, 0xFF])
CMD_POWER_ON: Final = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x01, 0xFE])
# ── Known input source byte codes ───────────────────────────────────────────
DEFAULT_SOURCES: Final = {
0x40: "Bluetooth",
0x55: "Media Player",
0x52: "CBL/SAT",
0x53: "DVD",
0x54: "Blu-ray",
0x56: "Game",
0x57: "AUX",
}
+10
View File
@@ -0,0 +1,10 @@
{
"domain": "denon_bt",
"name": "Denon AVR Bluetooth",
"codeowners": [],
"config_flow": true,
"documentation": "https://github.com/ckoch/denon-bluetooth",
"iot_class": "local_push",
"requirements": [],
"version": "1.0.0"
}
+221
View File
@@ -0,0 +1,221 @@
"""Media player entity for Denon AVR Bluetooth."""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.components.media_player import (
MediaPlayerDeviceClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_call_later
from .const import (
CMD_MUTE_OFF,
CMD_MUTE_ON,
CMD_POWER_OFF,
CMD_POWER_ON,
CMD_VOLUME_DOWN,
CMD_VOLUME_UP,
DOMAIN,
RECONNECT_INTERVAL,
VOLUME_PRACTICAL_MAX,
)
from .protocol import DenonBTClient, DenonState, cmd_select_input
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the Denon BT media player from a config entry."""
mac: str = entry.data["mac"]
name: str = entry.data.get("name", "Denon AVR")
async_add_entities([DenonBTMediaPlayer(hass, entry, mac, name)])
class DenonBTMediaPlayer(MediaPlayerEntity):
"""Representation of a Denon AVR-S540BT over Bluetooth RFCOMM."""
_attr_device_class = MediaPlayerDeviceClass.RECEIVER
_attr_has_entity_name = True
_attr_name = None
_attr_supported_features = (
MediaPlayerEntityFeature.VOLUME_STEP
| MediaPlayerEntityFeature.VOLUME_MUTE
| MediaPlayerEntityFeature.TURN_ON
| MediaPlayerEntityFeature.TURN_OFF
| MediaPlayerEntityFeature.SELECT_SOURCE
)
def __init__(
self,
hass: HomeAssistant,
entry: ConfigEntry,
mac: str,
name: str,
) -> None:
self._hass = hass
self._mac = mac
self._attr_unique_id = mac.replace(":", "").lower()
self._attr_device_info = {
"identifiers": {(DOMAIN, mac)},
"name": name,
"manufacturer": "Denon",
"model": "AVR-S540BT",
"connections": {("bluetooth", mac)},
}
self._client: DenonBTClient | None = None
self._state = DenonState()
self._reconnect_cancel: callback | None = None
self._reconnect_attempts = 0
# ── Lifecycle ────────────────────────────────────────────────────────────
async def async_added_to_hass(self) -> None:
self._client = DenonBTClient(
self._hass, self._mac, self._handle_state_update
)
await self._async_try_connect()
async def async_will_remove_from_hass(self) -> None:
self._cancel_reconnect()
if self._client:
await self._client.async_disconnect()
# ── State callback from protocol client ──────────────────────────────────
@callback
def _handle_state_update(self, state: DenonState) -> None:
"""Called on the event loop when the protocol client reports a change."""
self._state = state
self._attr_available = self._client.connected if self._client else False
if not self._attr_available:
self._schedule_reconnect()
else:
self._reconnect_attempts = 0
self._cancel_reconnect()
self.async_write_ha_state()
# ── Connection / reconnect ───────────────────────────────────────────────
async def _async_try_connect(self) -> None:
if self._client and await self._client.async_connect():
self._attr_available = True
self._reconnect_attempts = 0
_LOGGER.info("Connected to Denon AVR at %s", self._mac)
else:
self._attr_available = False
self._schedule_reconnect()
_LOGGER.warning(
"Could not connect to Denon AVR at %s", self._mac
)
def _schedule_reconnect(self) -> None:
if self._reconnect_cancel is not None:
return
delay = min(RECONNECT_INTERVAL, 5 * (self._reconnect_attempts + 1))
self._reconnect_cancel = async_call_later(
self._hass, delay, self._async_reconnect
)
self._reconnect_attempts += 1
_LOGGER.debug(
"Reconnect to %s in %ds (attempt %d)",
self._mac,
delay,
self._reconnect_attempts,
)
def _cancel_reconnect(self) -> None:
if self._reconnect_cancel:
self._reconnect_cancel()
self._reconnect_cancel = None
async def _async_reconnect(self, _now: Any = None) -> None:
self._reconnect_cancel = None
if self._client:
await self._client.async_disconnect()
await self._async_try_connect()
# ── State properties ─────────────────────────────────────────────────────
@property
def state(self) -> MediaPlayerState:
if not self._attr_available:
return MediaPlayerState.OFF
if self._state.power:
return MediaPlayerState.ON
return MediaPlayerState.OFF
@property
def volume_level(self) -> float | None:
if self._state.volume_raw is None:
return None
return min(1.0, self._state.volume_raw / VOLUME_PRACTICAL_MAX)
@property
def is_volume_muted(self) -> bool | None:
return self._state.muted
@property
def source(self) -> str | None:
return self._state.source
@property
def source_list(self) -> list[str]:
return list(self._state.sources.values())
@property
def extra_state_attributes(self) -> dict[str, Any]:
attrs: dict[str, Any] = {}
if self._state.sound_mode:
attrs["sound_mode"] = self._state.sound_mode
if self._state.volume_raw is not None:
attrs["volume_raw"] = self._state.volume_raw
attrs["volume_display"] = self._state.volume_raw / 2.0
return attrs
# ── Commands ─────────────────────────────────────────────────────────────
async def async_turn_on(self) -> None:
if self._client:
if not self._client.connected:
await self._async_try_connect()
await self._client.async_send(CMD_POWER_ON)
async def async_turn_off(self) -> None:
if self._client:
await self._client.async_send(CMD_POWER_OFF)
async def async_volume_up(self) -> None:
if self._client:
await self._client.async_send(CMD_VOLUME_UP)
async def async_volume_down(self) -> None:
if self._client:
await self._client.async_send(CMD_VOLUME_DOWN)
async def async_mute_volume(self, mute: bool) -> None:
if self._client:
await self._client.async_send(CMD_MUTE_ON if mute else CMD_MUTE_OFF)
async def async_select_source(self, source: str) -> None:
if not self._client:
return
for code, name in self._state.sources.items():
if name == source:
await self._client.async_send(cmd_select_input(code))
return
_LOGGER.warning("Unknown source: %s", source)
+359
View File
@@ -0,0 +1,359 @@
"""Denon AVR-S540BT Bluetooth RFCOMM protocol client."""
from __future__ import annotations
import logging
import socket
import threading
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from .const import (
CMD_GET_SOURCES,
CMD_GET_STATUS,
CONNECT_TIMEOUT,
DEFAULT_SOURCES,
RFCOMM_CHANNEL,
)
if TYPE_CHECKING:
from homeassistant.core import HomeAssistant
_LOGGER = logging.getLogger(__name__)
@dataclass
class DenonState:
"""Current state of the Denon AVR."""
power: bool = False
volume_raw: int | None = None
muted: bool = False
source: str | None = None
source_code: int | None = None
sound_mode: str | None = None
sources: dict[int, str] = field(default_factory=lambda: dict(DEFAULT_SOURCES))
def cmd_select_input(source_byte: int) -> bytes:
"""Build input selection command."""
return bytes([0x41, 0x54, 0x00, 0x07, 0x01, source_byte, 0x00])
class DenonBTClient:
"""Async-compatible Denon Bluetooth RFCOMM client for Home Assistant."""
def __init__(
self,
hass: HomeAssistant,
mac: str,
state_callback: Callable[[DenonState], None],
) -> None:
self._hass = hass
self._mac = mac
self._state_callback = state_callback
self._sock: socket.socket | None = None
self._connected = False
self._stop_event = threading.Event()
self._recv_thread: threading.Thread | None = None
self._state = DenonState()
self._lock = threading.Lock()
@property
def connected(self) -> bool:
return self._connected
@property
def state(self) -> DenonState:
return self._state
# ── Connection ───────────────────────────────────────────────────────────
async def async_connect(self) -> bool:
"""Connect to the AVR (blocking work runs in executor)."""
try:
await self._hass.async_add_executor_job(self._blocking_connect)
return True
except Exception as err:
_LOGGER.error("Connection to %s failed: %s", self._mac, err)
return False
def _blocking_connect(self) -> None:
self._stop_event.clear()
sock = socket.socket(
socket.AF_BLUETOOTH,
socket.SOCK_STREAM,
socket.BTPROTO_RFCOMM,
)
sock.settimeout(CONNECT_TIMEOUT)
sock.connect((self._mac, RFCOMM_CHANNEL))
sock.settimeout(None)
self._sock = sock
self._connected = True
self._recv_thread = threading.Thread(
target=self._recv_loop,
daemon=True,
name=f"denon_bt_recv_{self._mac}",
)
self._recv_thread.start()
# Give the AVR a moment, then request initial state.
time.sleep(0.3)
self._blocking_send(CMD_GET_SOURCES)
self._blocking_send(CMD_GET_STATUS)
async def async_disconnect(self) -> None:
await self._hass.async_add_executor_job(self._blocking_disconnect)
def _blocking_disconnect(self) -> None:
self._stop_event.set()
self._connected = False
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
# ── Send ─────────────────────────────────────────────────────────────────
async def async_send(self, data: bytes) -> bool:
if not self._connected:
return False
try:
await self._hass.async_add_executor_job(self._blocking_send, data)
return True
except Exception as err:
_LOGGER.error("Send to %s failed: %s", self._mac, err)
await self.async_disconnect()
return False
def _blocking_send(self, data: bytes) -> None:
if self._sock:
self._sock.sendall(data)
# ── Receive loop ─────────────────────────────────────────────────────────
def _recv_loop(self) -> None:
buf = b""
while not self._stop_event.is_set() and self._sock:
try:
chunk = self._sock.recv(256)
if not chunk:
break
buf += chunk
while len(buf) >= 4:
idx = buf.find(b"AT")
if idx == -1:
buf = b""
break
if idx > 0:
buf = buf[idx:]
if len(buf) < 4:
break
pkt_len = _packet_length(buf)
if pkt_len is None or len(buf) < pkt_len:
break
pkt = buf[:pkt_len]
buf = buf[pkt_len:]
self._handle_packet(pkt)
except Exception as err:
_LOGGER.debug("Recv loop ended: %s", err)
break
self._connected = False
self._notify_state_change()
# ── Packet handling ──────────────────────────────────────────────────────
def _handle_packet(self, data: bytes) -> None:
if len(data) < 4:
return
cat, typ = data[2], data[3]
changed = False
# Volume report: AT 07 02 03 C5 [vol] 00 [chk]
if cat == 0x07 and typ == 0x02 and len(data) >= 7:
if data[4] == 0x03 and data[5] == 0xC5:
with self._lock:
self._state.volume_raw = data[6]
self._state.power = True
changed = True
# Mute state: AT 07 1D 01 [state] [chk]
elif cat == 0x07 and typ == 0x1D and len(data) >= 6:
with self._lock:
self._state.muted = data[4] == 0x01 and data[5] == 0x01
changed = True
# Sound mode: AT 07 1E 15 00 [20 bytes text] [chk]
elif cat == 0x07 and typ == 0x1E and len(data) >= 6:
text = bytes(b for b in data[5:] if 32 <= b < 127).decode(
"ascii", errors="replace"
).strip()
if text:
with self._lock:
self._state.sound_mode = text
changed = True
# Input sources list: AT 00 04 0A 07 ...
elif cat == 0x00 and typ == 0x04 and len(data) >= 8:
discovered: dict[int, str] = {}
for b in data[4:]:
if b in DEFAULT_SOURCES:
discovered[b] = DEFAULT_SOURCES[b]
elif 0x40 <= b <= 0x7F:
discovered[b] = f"Input {b:02X}"
if discovered:
with self._lock:
self._state.sources = discovered
changed = True
# Current input: AT 00 07 02 [src] 00 [chk]
elif cat == 0x00 and typ == 0x07 and len(data) >= 6 and data[4] == 0x02:
code = data[5]
with self._lock:
self._state.source_code = code
self._state.source = self._state.sources.get(
code, f"Input {code:02X}"
)
changed = True
# Power echo: AT 00 0A 01 [state] [chk]
elif cat == 0x00 and typ == 0x0A and len(data) >= 6:
with self._lock:
self._state.power = data[4] == 0x01 and data[5] == 0x01
changed = True
# Power ack (off): AT 07 0B 01 00 FF
elif cat == 0x07 and typ == 0x0B and len(data) >= 6:
if data[4] == 0x01 and data[5] == 0x00:
with self._lock:
self._state.power = False
changed = True
if changed:
self._notify_state_change()
def _notify_state_change(self) -> None:
if self._hass and self._state_callback:
self._hass.loop.call_soon_threadsafe(
self._state_callback, self._state
)
# ── Packet length table (from denon_remote.py) ──────────────────────────────
def _packet_length(buf: bytes) -> int | None:
"""Determine expected packet length from the first bytes."""
if len(buf) < 4:
return None
cat = buf[2]
typ = buf[3]
if cat == 0x07:
if typ in (0x00, 0x01):
return 6
if typ == 0x02:
return 9
if typ == 0x03:
return 6
if typ == 0x1E:
return 27
if typ in (0x0B, 0x1D, 0x25):
return 7
if typ == 0x31:
return 14
if cat == 0x00:
if typ == 0x04:
if len(buf) > 4 and buf[4] == 0x00:
return 6
return 16
if typ == 0x06:
if len(buf) > 4 and buf[4] == 0x00:
return 6
return 75
if typ == 0x07:
if len(buf) > 4 and buf[4] == 0x01:
return 7
return 8
if typ == 0x09:
return 7
if typ == 0x0A:
return 7
if typ == 0x0B:
return 7
if typ == 0x0D:
return 21
if typ == 0x0E:
return 16
if typ == 0x13:
return 7
if cat == 0x0D:
if typ == 0x08:
return 13
if typ == 0x0D:
return 6
if cat == 0x81:
if typ == 0x04:
return 7
if typ == 0x32:
return 119
if typ == 0x40:
return 7
if cat == 0x82:
if typ == 0x31:
return 14
# Fallback: next AT header or 32 bytes max
next_at = buf.find(b"AT", 2)
if next_at > 0:
return next_at
return min(32, len(buf))
# ── D-Bus device discovery ───────────────────────────────────────────────────
def find_denon_devices() -> list[dict[str, str | bool]]:
"""Query BlueZ D-Bus for paired devices with 'DENON' in the name."""
try:
import dbus # noqa: PLC0415
bus = dbus.SystemBus()
manager = dbus.Interface(
bus.get_object("org.bluez", "/"),
"org.freedesktop.DBus.ObjectManager",
)
objects = manager.GetManagedObjects()
results: list[dict[str, str | bool]] = []
for _path, interfaces in objects.items():
if "org.bluez.Device1" in interfaces:
dev = interfaces["org.bluez.Device1"]
name = str(dev.get("Name", ""))
if "DENON" in name.upper():
results.append(
{
"mac": str(dev.get("Address")),
"name": name,
"paired": bool(dev.get("Paired", False)),
}
)
return results
except Exception as err:
_LOGGER.debug("D-Bus discovery failed: %s", err)
return []
@@ -0,0 +1,27 @@
{
"config": {
"step": {
"select": {
"title": "Select Denon AVR",
"description": "Select a discovered Denon device, or enter a MAC address manually.",
"data": {
"device": "Device"
}
},
"manual": {
"title": "Manual Configuration",
"description": "Enter the Bluetooth MAC address of your Denon AVR receiver.",
"data": {
"mac": "MAC Address",
"name": "Device Name"
}
}
},
"error": {
"invalid_mac": "Invalid MAC address. Use the format XX:XX:XX:XX:XX:XX."
},
"abort": {
"already_configured": "This device is already configured."
}
}
}
+858
View File
@@ -0,0 +1,858 @@
#!/usr/bin/env python3
"""
Denon AVR-S540BT Bluetooth Remote — Prototype
Uses RFCOMM channel 2 (BT SPP 2) as identified from btsnoop capture.
Run on Linux with: python3 denon_remote.py
"""
import socket
import threading
import tkinter as tk
from tkinter import font as tkfont
import queue
import struct
import time
try:
import dbus
HAS_DBUS = True
except ImportError:
HAS_DBUS = False
# ─── Bluetooth device discovery ──────────────────────────────────────────────
def find_denon_devices():
"""Query BlueZ D-Bus for paired devices with 'DENON' in the name."""
if not HAS_DBUS:
return []
try:
bus = dbus.SystemBus()
manager = dbus.Interface(
bus.get_object("org.bluez", "/"),
"org.freedesktop.DBus.ObjectManager"
)
objects = manager.GetManagedObjects()
results = []
for path, interfaces in objects.items():
if "org.bluez.Device1" in interfaces:
dev = interfaces["org.bluez.Device1"]
name = str(dev.get("Name", ""))
if "DENON" in name.upper():
results.append({
"mac": str(dev.get("Address")),
"name": name,
"paired": bool(dev.get("Paired", False)),
})
return results
except Exception:
return []
def find_bt_adapter():
"""Return the D-Bus object path of the first BlueZ adapter, or None."""
if not HAS_DBUS:
return None
try:
bus = dbus.SystemBus()
manager = dbus.Interface(
bus.get_object("org.bluez", "/"),
"org.freedesktop.DBus.ObjectManager"
)
objects = manager.GetManagedObjects()
for path, interfaces in objects.items():
if "org.bluez.Adapter1" in interfaces:
return str(path)
except Exception:
pass
return None
def discover_denon_devices(duration=8):
"""Run BT discovery for `duration` seconds; return unpaired Denon devices.
Returns list of dicts: {mac, name, path} where path is the D-Bus object path.
Must be called from a worker thread (blocks for `duration` seconds).
"""
if not HAS_DBUS:
return []
adapter_path = find_bt_adapter()
if adapter_path is None:
return []
try:
bus = dbus.SystemBus()
adapter = dbus.Interface(
bus.get_object("org.bluez", adapter_path),
"org.bluez.Adapter1"
)
try:
adapter.StartDiscovery()
except dbus.exceptions.DBusException as e:
if "InProgress" not in str(e):
return []
time.sleep(duration)
try:
adapter.StopDiscovery()
except Exception:
pass
manager = dbus.Interface(
bus.get_object("org.bluez", "/"),
"org.freedesktop.DBus.ObjectManager"
)
objects = manager.GetManagedObjects()
results = []
for path, interfaces in objects.items():
if "org.bluez.Device1" in interfaces:
dev = interfaces["org.bluez.Device1"]
name = str(dev.get("Name", ""))
paired = bool(dev.get("Paired", False))
if "DENON" in name.upper() and not paired:
results.append({
"mac": str(dev.get("Address")),
"name": name,
"path": str(path),
})
return results
except Exception:
return []
def pair_and_trust_device(device_path):
"""Pair and trust a BlueZ device. Blocks until pairing completes.
Returns (True, "") on success, (False, error_message) on failure.
"""
if not HAS_DBUS:
return (False, "D-Bus not available")
try:
bus = dbus.SystemBus()
device = dbus.Interface(
bus.get_object("org.bluez", device_path),
"org.bluez.Device1"
)
device.Pair()
props = dbus.Interface(
bus.get_object("org.bluez", device_path),
"org.freedesktop.DBus.Properties"
)
props.Set("org.bluez.Device1", "Trusted", dbus.Boolean(True))
return (True, "")
except dbus.exceptions.DBusException as e:
if "AlreadyExists" in str(e):
return (True, "")
return (False, str(e))
except Exception as e:
return (False, str(e))
# ─── Protocol constants ───────────────────────────────────────────────────────
RFCOMM_CHANNEL = 2
# Commands (PHONE -> AVR)
CMD_VOLUME_UP = bytes([0x41, 0x54, 0x07, 0x00, 0x00, 0x00])
CMD_VOLUME_DOWN = bytes([0x41, 0x54, 0x07, 0x01, 0x00, 0x00])
CMD_GET_SOURCES = bytes([0x41, 0x54, 0x00, 0x04, 0x00, 0x00])
CMD_GET_STATUS = bytes([0x41, 0x54, 0x00, 0x06, 0x00, 0x00])
CMD_MUTE_ON = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x01, 0xFE])
CMD_MUTE_OFF = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x00, 0xFF])
CMD_POWER_OFF = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x00, 0xFF])
CMD_POWER_ON = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x01, 0xFE])
def cmd_select_input(source_byte):
# AT 00 07 01 [source] [checksum_placeholder]
return bytes([0x41, 0x54, 0x00, 0x07, 0x01, source_byte, 0x00])
# Known input source codes observed in log (R,S,T,V,W bytes)
# Exact label mapping is approximate — adjust after testing
INPUT_SOURCES = {
0x40: "Bluetooth",
0x55: "Media Player", # 'U' — observed in source list, label approximate
0x52: "CBL/SAT", # 'R'
0x53: "DVD", # 'S'
0x54: "Blu-ray", # 'T'
0x56: "Game", # 'V'
0x57: "AUX", # 'W'
}
# ─── Bluetooth connection ─────────────────────────────────────────────────────
class DenonBT:
def __init__(self, on_message, on_status):
self.sock = None
self.connected = False
self.on_message = on_message # callback(bytes)
self.on_status = on_status # callback(str)
self._recv_thread = None
self._stop = threading.Event()
def connect(self, mac):
self._stop.clear()
try:
self.on_status("Connecting…")
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
s.settimeout(10)
s.connect((mac, RFCOMM_CHANNEL))
s.settimeout(None)
self.sock = s
self.connected = True
self.on_status(f"Connected to {mac}")
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self._recv_thread.start()
# Request initial state
time.sleep(0.3)
self.send(CMD_GET_SOURCES)
self.send(CMD_GET_STATUS)
except Exception as e:
self.connected = False
self.on_status(f"Connection failed: {e}")
def disconnect(self):
self._stop.set()
self.connected = False
if self.sock:
try:
self.sock.close()
except Exception:
pass
self.sock = None
self.on_status("Disconnected")
def send(self, data: bytes):
if not self.connected or not self.sock:
self.on_status("Not connected")
return
try:
self.sock.sendall(data)
self.on_message(data, direction="SENT")
except Exception as e:
self.on_status(f"Send error: {e}")
self.disconnect()
def _recv_loop(self):
buf = b""
while not self._stop.is_set() and self.sock:
try:
chunk = self.sock.recv(256)
if not chunk:
break
buf += chunk
# Process all complete AT packets in buffer
while len(buf) >= 4:
idx = buf.find(b'AT')
if idx == -1:
buf = b""
break
if idx > 0:
buf = buf[idx:]
# Minimum packet is 4 bytes: AT + 2 command bytes
# Use length byte at offset 3 if available, else wait for more
if len(buf) < 4:
break
pkt_len = self._packet_length(buf)
if pkt_len is None or len(buf) < pkt_len:
break
pkt = buf[:pkt_len]
buf = buf[pkt_len:]
self.on_message(pkt, direction="RECV")
except Exception:
break
self.on_status("Disconnected")
def _packet_length(self, buf):
"""Best-effort length detection for AT protocol packets."""
if len(buf) < 4:
return None
cat = buf[2]
typ = buf[3]
# Known fixed-length packets
if cat == 0x07:
if typ in (0x00, 0x01): return 6 # vol up/down command
if typ == 0x02: return 9 # vol report
if typ == 0x03: return 6 # unknown init cmd
if typ == 0x1e: return 27 # sound mode (AT + cmd + 0x15 + 0x00 + 20 text + chk)
if typ in (0x0b, 0x1d, 0x25): return 7
if typ == 0x31: return 14 # firmware/version string
if cat == 0x00:
if typ == 0x04:
if len(buf) > 4 and buf[4] == 0x00:
return 6 # request
return 16 # response with sources
if typ == 0x06:
if len(buf) > 4 and buf[4] == 0x00:
return 6 # request
return 75 # status response
if typ == 0x07:
if len(buf) > 4 and buf[4] == 0x01:
return 7 # input select command
return 8 # input report
if typ == 0x09: return 7
if typ == 0x0a: return 7
if typ == 0x0b: return 7
if typ == 0x0d: return 21 # device info with MAC
if typ == 0x0e: return 16 # device name
if typ == 0x13: return 7
if cat == 0x0d:
if typ == 0x08: return 13
if typ == 0x0d: return 6
if cat == 0x81:
if typ == 0x04: return 7
if typ == 0x32: return 119 # large volume table
if typ == 0x40: return 7
if cat == 0x82:
if typ == 0x31: return 14
# Fallback: read until next AT or 32 bytes
next_at = buf.find(b'AT', 2)
if next_at > 0:
return next_at
return min(32, len(buf))
# ─── GUI ─────────────────────────────────────────────────────────────────────
BG = "#0d0d0d"
PANEL = "#161616"
BORDER = "#2a2a2a"
ACCENT = "#e8c97a" # warm gold
ACCENT2 = "#4fc3f7" # cool blue
TEXT = "#f0ede6"
MUTED = "#666"
RED = "#e05252"
GREEN = "#4caf50"
class DenonRemoteApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("Denon AVR-S540BT Remote")
self.configure(bg=BG)
self.resizable(False, False)
self._msg_queue = queue.Queue()
self._vol = tk.StringVar(value="")
self._vol_raw = None # raw volume byte (0x00-0xFF)
self._muted = False
self._sound_mode = tk.StringVar(value="")
self._status = tk.StringVar(value="Disconnected")
self._sources = {} # byte -> label
self._cur_input = None
self._devices = [] # list of dicts from find_denon_devices()
self._selected_device = None # index into _devices
self._unpaired_devices = [] # list of dicts from discover_denon_devices()
self._discovering = False
self._pairing = False
self.bt = DenonBT(
on_message=self._on_bt_message,
on_status=self._on_bt_status,
)
self._build_ui()
self._poll_queue()
self._scan_devices()
# ── UI construction ───────────────────────────────────────────────────────
def _build_ui(self):
self._fonts()
# ── Header ────────────────────────────────────────────────────────────
hdr = tk.Frame(self, bg=BG, pady=18)
hdr.pack(fill="x", padx=24)
tk.Label(hdr, text="DENON", font=self.f_brand,
bg=BG, fg=ACCENT).pack(side="left")
tk.Label(hdr, text=" AVR-S540BT", font=self.f_model,
bg=BG, fg=MUTED).pack(side="left", padx=(2, 0))
# Status dot
self._status_dot = tk.Label(hdr, text="", font=self.f_dot,
bg=BG, fg=RED)
self._status_dot.pack(side="right")
tk.Label(hdr, textvariable=self._status, font=self.f_small,
bg=BG, fg=MUTED).pack(side="right", padx=(0, 6))
self._divider()
# ── Connection panel ──────────────────────────────────────────────────
conn = tk.Frame(self, bg=PANEL, padx=20, pady=16)
conn.pack(fill="x", padx=16, pady=(12, 0))
tk.Label(conn, text="DEVICE", font=self.f_label,
bg=PANEL, fg=MUTED).grid(row=0, column=0, sticky="w",
columnspan=3)
self._device_frame = tk.Frame(conn, bg=PANEL)
self._device_frame.grid(row=1, column=0, pady=(4, 0), sticky="w")
self._device_label = tk.Label(
self._device_frame, text="Scanning…", font=self.f_mono,
bg=PANEL, fg=MUTED, anchor="w"
)
self._device_label.pack(side="left")
btn_row = tk.Frame(conn, bg=PANEL)
btn_row.grid(row=0, column=2, sticky="e")
self._scan_btn = self._btn(
btn_row, "SCAN", self._scan_devices,
bg=PANEL, fg=MUTED, font=self.f_small
)
self._scan_btn.pack(side="left", padx=(0, 4))
self._scan_pair_btn = self._btn(
btn_row, "SCAN & PAIR", self._start_discovery,
bg=PANEL, fg=ACCENT2, font=self.f_small
)
self._scan_pair_btn.pack(side="left")
self._conn_btn = self._btn(
conn, "CONNECT", self._toggle_connect,
bg=ACCENT, fg=BG, font=self.f_btn
)
self._conn_btn.grid(row=1, column=2, padx=(10, 0), pady=(4, 0),
ipady=6, ipadx=14)
self._unpaired_frame = tk.Frame(conn, bg=PANEL)
self._unpaired_frame.grid(row=2, column=0, columnspan=3,
pady=(8, 0), sticky="w")
self._divider()
# ── Power panel ──────────────────────────────────────────────────
pwr_frame = tk.Frame(self, bg=BG, pady=6)
pwr_frame.pack(fill="x", padx=24)
tk.Label(pwr_frame, text="POWER", font=self.f_label,
bg=BG, fg=MUTED).pack(side="left")
self._btn(
pwr_frame, "OFF", lambda: self.bt.send(CMD_POWER_OFF),
bg=PANEL, fg=RED, font=self.f_btn,
).pack(side="right", ipady=5, ipadx=14)
self._btn(
pwr_frame, "ON", lambda: self.bt.send(CMD_POWER_ON),
bg=PANEL, fg=GREEN, font=self.f_btn,
).pack(side="right", padx=(0, 6), ipady=5, ipadx=14)
self._divider()
# ── Volume panel ──────────────────────────────────────────────────────
vol_frame = tk.Frame(self, bg=BG, pady=8)
vol_frame.pack(fill="x", padx=24)
tk.Label(vol_frame, text="VOLUME", font=self.f_label,
bg=BG, fg=MUTED).pack(anchor="w")
vol_ctrl = tk.Frame(vol_frame, bg=BG)
vol_ctrl.pack(fill="x", pady=(8, 0))
self._vol_down_btn = self._btn(
vol_ctrl, "", lambda: self.bt.send(CMD_VOLUME_DOWN),
bg=PANEL, fg=TEXT, font=self.f_arrow,
width=4, activebackground=BORDER
)
self._vol_down_btn.pack(side="left", ipady=12)
tk.Label(vol_ctrl, textvariable=self._vol, font=self.f_vol,
bg=BG, fg=ACCENT, width=6).pack(side="left")
self._vol_up_btn = self._btn(
vol_ctrl, "", lambda: self.bt.send(CMD_VOLUME_UP),
bg=PANEL, fg=TEXT, font=self.f_arrow,
width=4, activebackground=BORDER
)
self._vol_up_btn.pack(side="left", ipady=12)
self._mute_btn = self._btn(
vol_ctrl, "MUTE", self._toggle_mute,
bg=PANEL, fg=MUTED, font=self.f_btn,
activebackground=RED
)
self._mute_btn.pack(side="right", ipady=6, ipadx=14)
self._divider()
# ── Sound mode ────────────────────────────────────────────────────────
sm = tk.Frame(self, bg=BG, pady=6)
sm.pack(fill="x", padx=24)
tk.Label(sm, text="SOUND MODE", font=self.f_label,
bg=BG, fg=MUTED).pack(side="left")
tk.Label(sm, textvariable=self._sound_mode, font=self.f_value,
bg=BG, fg=ACCENT2).pack(side="right")
self._divider()
# ── Input sources ─────────────────────────────────────────────────────
inp = tk.Frame(self, bg=BG, pady=8)
inp.pack(fill="x", padx=24)
tk.Label(inp, text="INPUT", font=self.f_label,
bg=BG, fg=MUTED).pack(anchor="w")
self._input_frame = tk.Frame(inp, bg=BG)
self._input_frame.pack(fill="x", pady=(8, 0))
# Populated dynamically when sources are received
self._input_buttons = {}
self._render_inputs(INPUT_SOURCES)
self._divider()
# ── Log ───────────────────────────────────────────────────────────────
log_hdr = tk.Frame(self, bg=BG)
log_hdr.pack(fill="x", padx=24, pady=(4, 0))
tk.Label(log_hdr, text="PACKET LOG", font=self.f_label,
bg=BG, fg=MUTED).pack(side="left")
self._btn(log_hdr, "CLEAR", self._clear_log,
bg=BG, fg=MUTED, font=self.f_small).pack(side="right")
self._log = tk.Text(
self, height=8, font=self.f_mono,
bg="#0a0a0a", fg="#3a7a3a", insertbackground=ACCENT,
relief="flat", bd=0, state="disabled",
highlightthickness=1, highlightbackground=BORDER,
padx=8, pady=6
)
self._log.pack(fill="x", padx=16, pady=(4, 16))
self._log.tag_config("sent", foreground="#4fc3f7")
self._log.tag_config("recv", foreground="#81c784")
self._log.tag_config("info", foreground=MUTED)
def _fonts(self):
self.f_brand = tkfont.Font(family="Courier", size=18, weight="bold")
self.f_model = tkfont.Font(family="Courier", size=12)
self.f_label = tkfont.Font(family="Courier", size=8, weight="bold")
self.f_value = tkfont.Font(family="Courier", size=12)
self.f_vol = tkfont.Font(family="Courier", size=32, weight="bold")
self.f_btn = tkfont.Font(family="Courier", size=9, weight="bold")
self.f_arrow = tkfont.Font(family="Courier", size=14, weight="bold")
self.f_mono = tkfont.Font(family="Courier", size=9)
self.f_small = tkfont.Font(family="Courier", size=8)
self.f_dot = tkfont.Font(family="Courier", size=10)
def _btn(self, parent, text, cmd, **kw):
defaults = dict(
text=text, command=cmd,
bg=PANEL, fg=TEXT,
font=self.f_btn,
relief="flat", bd=0,
cursor="hand2",
activeforeground=ACCENT,
activebackground=BORDER,
)
defaults.update(kw)
return tk.Button(parent, **defaults)
def _divider(self):
tk.Frame(self, bg=BORDER, height=1).pack(
fill="x", padx=16, pady=6)
def _render_inputs(self, sources):
for w in self._input_frame.winfo_children():
w.destroy()
self._input_buttons = {}
for code, label in sources.items():
btn = self._btn(
self._input_frame, label,
lambda c=code: self._select_input(c),
bg=PANEL, fg=MUTED, font=self.f_btn,
)
btn.pack(side="left", padx=(0, 6), ipady=5, ipadx=8)
self._input_buttons[code] = btn
if not sources:
tk.Label(self._input_frame, text="No sources discovered yet",
font=self.f_small, bg=BG, fg=MUTED).pack(side="left")
def _toggle_mute(self):
if self._muted:
self.bt.send(CMD_MUTE_OFF)
else:
self.bt.send(CMD_MUTE_ON)
def _update_mute_ui(self):
if self._muted:
self._mute_btn.configure(bg=RED, fg=TEXT, text="MUTED")
else:
self._mute_btn.configure(bg=PANEL, fg=MUTED, text="MUTE")
def _select_input(self, code):
self.bt.send(cmd_select_input(code))
self._highlight_input(code)
def _highlight_input(self, code):
self._cur_input = code
for c, btn in self._input_buttons.items():
if c == code:
btn.configure(fg=ACCENT, bg="#252010",
highlightthickness=1,
highlightbackground=ACCENT,
highlightcolor=ACCENT)
else:
btn.configure(fg=MUTED, bg=PANEL,
highlightthickness=0)
def _clear_log(self):
self._log.configure(state="normal")
self._log.delete("1.0", "end")
self._log.configure(state="disabled")
# ── Device discovery ─────────────────────────────────────────────────────
def _scan_devices(self):
self._devices = find_denon_devices()
self._selected_device = 0 if self._devices else None
self._render_device_list()
def _render_device_list(self):
for w in self._device_frame.winfo_children():
w.destroy()
if not HAS_DBUS:
tk.Label(self._device_frame,
text="Install python3-dbus for auto-detection",
font=self.f_small, bg=PANEL, fg=RED).pack(side="left")
return
if not self._devices:
tk.Label(self._device_frame,
text="No paired Denon device — use SCAN & PAIR",
font=self.f_small, bg=PANEL, fg=MUTED).pack(side="left")
return
if len(self._devices) == 1:
dev = self._devices[0]
tk.Label(self._device_frame,
text=f"{dev['name']} {dev['mac']}",
font=self.f_mono, bg=PANEL, fg=TEXT).pack(side="left")
else:
for i, dev in enumerate(self._devices):
is_sel = (i == self._selected_device)
btn = self._btn(
self._device_frame,
f"{dev['name']}",
lambda idx=i: self._pick_device(idx),
bg="#252010" if is_sel else PANEL,
fg=ACCENT if is_sel else MUTED,
font=self.f_btn,
)
btn.pack(side="left", padx=(0, 6), ipady=3, ipadx=6)
def _pick_device(self, idx):
self._selected_device = idx
self._render_device_list()
# ── Bluetooth discovery & pairing ─────────────────────────────────────────
def _start_discovery(self):
if not HAS_DBUS:
self._log_append("Install python3-dbus for Bluetooth scanning", "info")
return
if self._discovering:
return
self._discovering = True
self._unpaired_devices = []
self._scan_pair_btn.configure(state="disabled", text="SCANNING…")
self._render_unpaired_list()
self._log_append("Starting Bluetooth discovery…", "info")
threading.Thread(target=self._discovery_worker, daemon=True).start()
def _discovery_worker(self):
devices = discover_denon_devices(duration=8)
self._msg_queue.put(("discovery_done", devices))
def _handle_discovery_done(self, devices):
self._discovering = False
self._unpaired_devices = devices
self._scan_pair_btn.configure(state="normal", text="SCAN & PAIR")
self._render_unpaired_list()
count = len(devices)
if count == 0:
self._log_append("Discovery complete — no unpaired Denon devices found", "info")
else:
self._log_append(f"Discovery complete — found {count} unpaired Denon device(s)", "info")
def _render_unpaired_list(self):
for w in self._unpaired_frame.winfo_children():
w.destroy()
if self._discovering:
tk.Label(self._unpaired_frame,
text="Scanning for Denon devices…",
font=self.f_small, bg=PANEL, fg=ACCENT2).pack(anchor="w")
return
if not self._unpaired_devices:
return
tk.Label(self._unpaired_frame,
text="UNPAIRED DEVICES", font=self.f_label,
bg=PANEL, fg=MUTED).pack(anchor="w", pady=(4, 2))
for dev in self._unpaired_devices:
row = tk.Frame(self._unpaired_frame, bg=PANEL)
row.pack(fill="x", pady=1)
tk.Label(row, text=f"{dev['name']} {dev['mac']}",
font=self.f_small, bg=PANEL, fg=MUTED).pack(side="left")
self._btn(
row, "PAIR",
lambda d=dev: self._start_pair(d),
bg=ACCENT2, fg=BG, font=self.f_small,
).pack(side="right", ipadx=8, ipady=2)
def _start_pair(self, device):
if self._pairing:
return
self._pairing = True
self._log_append(f"Pairing with {device['name']} ({device['mac']})…", "info")
for w in self._unpaired_frame.winfo_children():
for child in w.winfo_children():
if isinstance(child, tk.Button):
child.configure(state="disabled")
threading.Thread(
target=self._pair_worker, args=(device,), daemon=True
).start()
def _pair_worker(self, device):
success, error = pair_and_trust_device(device["path"])
self._msg_queue.put(("pair_done", device, success, error))
def _handle_pair_done(self, device, success, error):
self._pairing = False
if success:
self._log_append(f"Paired and trusted: {device['name']}", "info")
self._unpaired_devices = [
d for d in self._unpaired_devices if d["mac"] != device["mac"]
]
self._render_unpaired_list()
self._scan_devices()
else:
self._log_append(f"Pairing failed: {error}", "info")
self._render_unpaired_list()
# ── Connection ────────────────────────────────────────────────────────────
def _toggle_connect(self):
if self.bt.connected:
self.bt.disconnect()
self._conn_btn.configure(text="CONNECT", bg=ACCENT, fg=BG)
else:
if self._selected_device is None or not self._devices:
self._log_append("No paired Denon device — use SCAN & PAIR", "info")
return
mac = self._devices[self._selected_device]["mac"]
self._conn_btn.configure(text="DISCONNECT", bg=RED, fg=TEXT)
threading.Thread(
target=self.bt.connect, args=(mac,), daemon=True
).start()
# ── Message handling ──────────────────────────────────────────────────────
def _on_bt_message(self, data: bytes, direction: str):
self._msg_queue.put(("msg", data, direction))
def _on_bt_status(self, msg: str):
self._msg_queue.put(("status", msg))
def _poll_queue(self):
try:
while True:
item = self._msg_queue.get_nowait()
if item[0] == "status":
self._handle_status(item[1])
elif item[0] == "msg":
self._handle_message(item[1], item[2])
elif item[0] == "discovery_done":
self._handle_discovery_done(item[1])
elif item[0] == "pair_done":
self._handle_pair_done(item[1], item[2], item[3])
except queue.Empty:
pass
self.after(50, self._poll_queue)
def _handle_status(self, msg: str):
self._status.set(msg)
connected = msg.startswith("Connected")
self._status_dot.configure(fg=GREEN if connected else RED)
if not connected:
self._conn_btn.configure(text="CONNECT", bg=ACCENT, fg=BG)
def _handle_message(self, data: bytes, direction: str):
hex_str = " ".join(f"{b:02x}" for b in data)
tag = "sent" if direction == "SENT" else "recv"
arrow = "" if direction == "SENT" else ""
self._log_append(f"{arrow} {hex_str}", tag)
if len(data) < 3:
return
cat = data[2]
typ = data[3] if len(data) > 3 else None
# Volume report: AT 07 02 03 c5 [vol] ...
if cat == 0x07 and typ == 0x02 and len(data) >= 7:
if data[4] == 0x03 and data[5] == 0xc5:
vol = data[6]
self._vol_raw = vol
# Display as dB: raw/2 gives the display value shown on the AVR
vol_db = vol / 2.0
self._vol.set(f"{vol_db:g}")
return
# Mute state: AT 07 1d 01 [state] ...
if cat == 0x07 and typ == 0x1d and len(data) >= 6:
self._muted = (data[4] == 0x01 and data[5] == 0x01)
self._update_mute_ui()
return
# Sound mode: AT 07 1e ...
if cat == 0x07 and typ == 0x1e and len(data) >= 6:
text_bytes = data[5:]
text = bytes(b for b in text_bytes if 32 <= b < 127).decode("ascii", errors="replace").strip()
if text:
self._sound_mode.set(text)
return
# Input sources: AT 00 04 0a ...
if cat == 0x00 and typ == 0x04 and direction == "RECV" and len(data) >= 8:
raw = data[4:]
discovered = {}
for b in raw:
if b in INPUT_SOURCES:
discovered[b] = INPUT_SOURCES[b]
elif 0x40 <= b <= 0x7f:
discovered[b] = f"SRC-{b:02x}"
if discovered:
self._sources = discovered
self._render_inputs(discovered)
return
# Current input: AT 00 07 02 [source] ...
if cat == 0x00 and typ == 0x07 and direction == "RECV" and len(data) >= 5:
if data[4] == 0x02 and len(data) >= 6:
self._highlight_input(data[5])
return
# Device name: AT 00 0e ...
if cat == 0x00 and typ == 0x0e and direction == "RECV" and len(data) >= 6:
name_bytes = data[5:]
name = bytes(b for b in name_bytes if 32 <= b < 127).decode("ascii", errors="replace").strip()
if name:
self._log_append(f" Device: {name}", "info")
return
def _log_append(self, text: str, tag: str = ""):
self._log.configure(state="normal")
self._log.insert("end", text + "\n", tag)
self._log.see("end")
self._log.configure(state="disabled")
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
app = DenonRemoteApp()
app.mainloop()