Files
ckoch a5d2865eb2 Initial commit
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 10:59:47 -04:00

858 lines
33 KiB
Python

#!/usr/bin/env python3
"""
Denon AVR-S540BT Bluetooth Remote — Prototype
Uses RFCOMM channel 2 (BT SPP 2) as identified from btsnoop capture.
Run on Linux with: python3 denon_remote.py
"""
import socket
import threading
import tkinter as tk
from tkinter import font as tkfont
import queue
import struct
import time
try:
import dbus
HAS_DBUS = True
except ImportError:
HAS_DBUS = False
# ─── Bluetooth device discovery ──────────────────────────────────────────────
def find_denon_devices():
"""Query BlueZ D-Bus for paired devices with 'DENON' in the name."""
if not HAS_DBUS:
return []
try:
bus = dbus.SystemBus()
manager = dbus.Interface(
bus.get_object("org.bluez", "/"),
"org.freedesktop.DBus.ObjectManager"
)
objects = manager.GetManagedObjects()
results = []
for path, interfaces in objects.items():
if "org.bluez.Device1" in interfaces:
dev = interfaces["org.bluez.Device1"]
name = str(dev.get("Name", ""))
if "DENON" in name.upper():
results.append({
"mac": str(dev.get("Address")),
"name": name,
"paired": bool(dev.get("Paired", False)),
})
return results
except Exception:
return []
def find_bt_adapter():
"""Return the D-Bus object path of the first BlueZ adapter, or None."""
if not HAS_DBUS:
return None
try:
bus = dbus.SystemBus()
manager = dbus.Interface(
bus.get_object("org.bluez", "/"),
"org.freedesktop.DBus.ObjectManager"
)
objects = manager.GetManagedObjects()
for path, interfaces in objects.items():
if "org.bluez.Adapter1" in interfaces:
return str(path)
except Exception:
pass
return None
def discover_denon_devices(duration=8):
"""Run BT discovery for `duration` seconds; return unpaired Denon devices.
Returns list of dicts: {mac, name, path} where path is the D-Bus object path.
Must be called from a worker thread (blocks for `duration` seconds).
"""
if not HAS_DBUS:
return []
adapter_path = find_bt_adapter()
if adapter_path is None:
return []
try:
bus = dbus.SystemBus()
adapter = dbus.Interface(
bus.get_object("org.bluez", adapter_path),
"org.bluez.Adapter1"
)
try:
adapter.StartDiscovery()
except dbus.exceptions.DBusException as e:
if "InProgress" not in str(e):
return []
time.sleep(duration)
try:
adapter.StopDiscovery()
except Exception:
pass
manager = dbus.Interface(
bus.get_object("org.bluez", "/"),
"org.freedesktop.DBus.ObjectManager"
)
objects = manager.GetManagedObjects()
results = []
for path, interfaces in objects.items():
if "org.bluez.Device1" in interfaces:
dev = interfaces["org.bluez.Device1"]
name = str(dev.get("Name", ""))
paired = bool(dev.get("Paired", False))
if "DENON" in name.upper() and not paired:
results.append({
"mac": str(dev.get("Address")),
"name": name,
"path": str(path),
})
return results
except Exception:
return []
def pair_and_trust_device(device_path):
"""Pair and trust a BlueZ device. Blocks until pairing completes.
Returns (True, "") on success, (False, error_message) on failure.
"""
if not HAS_DBUS:
return (False, "D-Bus not available")
try:
bus = dbus.SystemBus()
device = dbus.Interface(
bus.get_object("org.bluez", device_path),
"org.bluez.Device1"
)
device.Pair()
props = dbus.Interface(
bus.get_object("org.bluez", device_path),
"org.freedesktop.DBus.Properties"
)
props.Set("org.bluez.Device1", "Trusted", dbus.Boolean(True))
return (True, "")
except dbus.exceptions.DBusException as e:
if "AlreadyExists" in str(e):
return (True, "")
return (False, str(e))
except Exception as e:
return (False, str(e))
# ─── Protocol constants ───────────────────────────────────────────────────────
RFCOMM_CHANNEL = 2
# Commands (PHONE -> AVR)
CMD_VOLUME_UP = bytes([0x41, 0x54, 0x07, 0x00, 0x00, 0x00])
CMD_VOLUME_DOWN = bytes([0x41, 0x54, 0x07, 0x01, 0x00, 0x00])
CMD_GET_SOURCES = bytes([0x41, 0x54, 0x00, 0x04, 0x00, 0x00])
CMD_GET_STATUS = bytes([0x41, 0x54, 0x00, 0x06, 0x00, 0x00])
CMD_MUTE_ON = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x01, 0xFE])
CMD_MUTE_OFF = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x00, 0xFF])
CMD_POWER_OFF = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x00, 0xFF])
CMD_POWER_ON = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x01, 0xFE])
def cmd_select_input(source_byte):
# AT 00 07 01 [source] [checksum_placeholder]
return bytes([0x41, 0x54, 0x00, 0x07, 0x01, source_byte, 0x00])
# Known input source codes observed in log (R,S,T,V,W bytes)
# Exact label mapping is approximate — adjust after testing
INPUT_SOURCES = {
0x40: "Bluetooth",
0x55: "Media Player", # 'U' — observed in source list, label approximate
0x52: "CBL/SAT", # 'R'
0x53: "DVD", # 'S'
0x54: "Blu-ray", # 'T'
0x56: "Game", # 'V'
0x57: "AUX", # 'W'
}
# ─── Bluetooth connection ─────────────────────────────────────────────────────
class DenonBT:
def __init__(self, on_message, on_status):
self.sock = None
self.connected = False
self.on_message = on_message # callback(bytes)
self.on_status = on_status # callback(str)
self._recv_thread = None
self._stop = threading.Event()
def connect(self, mac):
self._stop.clear()
try:
self.on_status("Connecting…")
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
s.settimeout(10)
s.connect((mac, RFCOMM_CHANNEL))
s.settimeout(None)
self.sock = s
self.connected = True
self.on_status(f"Connected to {mac}")
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self._recv_thread.start()
# Request initial state
time.sleep(0.3)
self.send(CMD_GET_SOURCES)
self.send(CMD_GET_STATUS)
except Exception as e:
self.connected = False
self.on_status(f"Connection failed: {e}")
def disconnect(self):
self._stop.set()
self.connected = False
if self.sock:
try:
self.sock.close()
except Exception:
pass
self.sock = None
self.on_status("Disconnected")
def send(self, data: bytes):
if not self.connected or not self.sock:
self.on_status("Not connected")
return
try:
self.sock.sendall(data)
self.on_message(data, direction="SENT")
except Exception as e:
self.on_status(f"Send error: {e}")
self.disconnect()
def _recv_loop(self):
buf = b""
while not self._stop.is_set() and self.sock:
try:
chunk = self.sock.recv(256)
if not chunk:
break
buf += chunk
# Process all complete AT packets in buffer
while len(buf) >= 4:
idx = buf.find(b'AT')
if idx == -1:
buf = b""
break
if idx > 0:
buf = buf[idx:]
# Minimum packet is 4 bytes: AT + 2 command bytes
# Use length byte at offset 3 if available, else wait for more
if len(buf) < 4:
break
pkt_len = self._packet_length(buf)
if pkt_len is None or len(buf) < pkt_len:
break
pkt = buf[:pkt_len]
buf = buf[pkt_len:]
self.on_message(pkt, direction="RECV")
except Exception:
break
self.on_status("Disconnected")
def _packet_length(self, buf):
"""Best-effort length detection for AT protocol packets."""
if len(buf) < 4:
return None
cat = buf[2]
typ = buf[3]
# Known fixed-length packets
if cat == 0x07:
if typ in (0x00, 0x01): return 6 # vol up/down command
if typ == 0x02: return 9 # vol report
if typ == 0x03: return 6 # unknown init cmd
if typ == 0x1e: return 27 # sound mode (AT + cmd + 0x15 + 0x00 + 20 text + chk)
if typ in (0x0b, 0x1d, 0x25): return 7
if typ == 0x31: return 14 # firmware/version string
if cat == 0x00:
if typ == 0x04:
if len(buf) > 4 and buf[4] == 0x00:
return 6 # request
return 16 # response with sources
if typ == 0x06:
if len(buf) > 4 and buf[4] == 0x00:
return 6 # request
return 75 # status response
if typ == 0x07:
if len(buf) > 4 and buf[4] == 0x01:
return 7 # input select command
return 8 # input report
if typ == 0x09: return 7
if typ == 0x0a: return 7
if typ == 0x0b: return 7
if typ == 0x0d: return 21 # device info with MAC
if typ == 0x0e: return 16 # device name
if typ == 0x13: return 7
if cat == 0x0d:
if typ == 0x08: return 13
if typ == 0x0d: return 6
if cat == 0x81:
if typ == 0x04: return 7
if typ == 0x32: return 119 # large volume table
if typ == 0x40: return 7
if cat == 0x82:
if typ == 0x31: return 14
# Fallback: read until next AT or 32 bytes
next_at = buf.find(b'AT', 2)
if next_at > 0:
return next_at
return min(32, len(buf))
# ─── GUI ─────────────────────────────────────────────────────────────────────
BG = "#0d0d0d"
PANEL = "#161616"
BORDER = "#2a2a2a"
ACCENT = "#e8c97a" # warm gold
ACCENT2 = "#4fc3f7" # cool blue
TEXT = "#f0ede6"
MUTED = "#666"
RED = "#e05252"
GREEN = "#4caf50"
class DenonRemoteApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("Denon AVR-S540BT Remote")
self.configure(bg=BG)
self.resizable(False, False)
self._msg_queue = queue.Queue()
self._vol = tk.StringVar(value="")
self._vol_raw = None # raw volume byte (0x00-0xFF)
self._muted = False
self._sound_mode = tk.StringVar(value="")
self._status = tk.StringVar(value="Disconnected")
self._sources = {} # byte -> label
self._cur_input = None
self._devices = [] # list of dicts from find_denon_devices()
self._selected_device = None # index into _devices
self._unpaired_devices = [] # list of dicts from discover_denon_devices()
self._discovering = False
self._pairing = False
self.bt = DenonBT(
on_message=self._on_bt_message,
on_status=self._on_bt_status,
)
self._build_ui()
self._poll_queue()
self._scan_devices()
# ── UI construction ───────────────────────────────────────────────────────
def _build_ui(self):
self._fonts()
# ── Header ────────────────────────────────────────────────────────────
hdr = tk.Frame(self, bg=BG, pady=18)
hdr.pack(fill="x", padx=24)
tk.Label(hdr, text="DENON", font=self.f_brand,
bg=BG, fg=ACCENT).pack(side="left")
tk.Label(hdr, text=" AVR-S540BT", font=self.f_model,
bg=BG, fg=MUTED).pack(side="left", padx=(2, 0))
# Status dot
self._status_dot = tk.Label(hdr, text="", font=self.f_dot,
bg=BG, fg=RED)
self._status_dot.pack(side="right")
tk.Label(hdr, textvariable=self._status, font=self.f_small,
bg=BG, fg=MUTED).pack(side="right", padx=(0, 6))
self._divider()
# ── Connection panel ──────────────────────────────────────────────────
conn = tk.Frame(self, bg=PANEL, padx=20, pady=16)
conn.pack(fill="x", padx=16, pady=(12, 0))
tk.Label(conn, text="DEVICE", font=self.f_label,
bg=PANEL, fg=MUTED).grid(row=0, column=0, sticky="w",
columnspan=3)
self._device_frame = tk.Frame(conn, bg=PANEL)
self._device_frame.grid(row=1, column=0, pady=(4, 0), sticky="w")
self._device_label = tk.Label(
self._device_frame, text="Scanning…", font=self.f_mono,
bg=PANEL, fg=MUTED, anchor="w"
)
self._device_label.pack(side="left")
btn_row = tk.Frame(conn, bg=PANEL)
btn_row.grid(row=0, column=2, sticky="e")
self._scan_btn = self._btn(
btn_row, "SCAN", self._scan_devices,
bg=PANEL, fg=MUTED, font=self.f_small
)
self._scan_btn.pack(side="left", padx=(0, 4))
self._scan_pair_btn = self._btn(
btn_row, "SCAN & PAIR", self._start_discovery,
bg=PANEL, fg=ACCENT2, font=self.f_small
)
self._scan_pair_btn.pack(side="left")
self._conn_btn = self._btn(
conn, "CONNECT", self._toggle_connect,
bg=ACCENT, fg=BG, font=self.f_btn
)
self._conn_btn.grid(row=1, column=2, padx=(10, 0), pady=(4, 0),
ipady=6, ipadx=14)
self._unpaired_frame = tk.Frame(conn, bg=PANEL)
self._unpaired_frame.grid(row=2, column=0, columnspan=3,
pady=(8, 0), sticky="w")
self._divider()
# ── Power panel ──────────────────────────────────────────────────
pwr_frame = tk.Frame(self, bg=BG, pady=6)
pwr_frame.pack(fill="x", padx=24)
tk.Label(pwr_frame, text="POWER", font=self.f_label,
bg=BG, fg=MUTED).pack(side="left")
self._btn(
pwr_frame, "OFF", lambda: self.bt.send(CMD_POWER_OFF),
bg=PANEL, fg=RED, font=self.f_btn,
).pack(side="right", ipady=5, ipadx=14)
self._btn(
pwr_frame, "ON", lambda: self.bt.send(CMD_POWER_ON),
bg=PANEL, fg=GREEN, font=self.f_btn,
).pack(side="right", padx=(0, 6), ipady=5, ipadx=14)
self._divider()
# ── Volume panel ──────────────────────────────────────────────────────
vol_frame = tk.Frame(self, bg=BG, pady=8)
vol_frame.pack(fill="x", padx=24)
tk.Label(vol_frame, text="VOLUME", font=self.f_label,
bg=BG, fg=MUTED).pack(anchor="w")
vol_ctrl = tk.Frame(vol_frame, bg=BG)
vol_ctrl.pack(fill="x", pady=(8, 0))
self._vol_down_btn = self._btn(
vol_ctrl, "", lambda: self.bt.send(CMD_VOLUME_DOWN),
bg=PANEL, fg=TEXT, font=self.f_arrow,
width=4, activebackground=BORDER
)
self._vol_down_btn.pack(side="left", ipady=12)
tk.Label(vol_ctrl, textvariable=self._vol, font=self.f_vol,
bg=BG, fg=ACCENT, width=6).pack(side="left")
self._vol_up_btn = self._btn(
vol_ctrl, "", lambda: self.bt.send(CMD_VOLUME_UP),
bg=PANEL, fg=TEXT, font=self.f_arrow,
width=4, activebackground=BORDER
)
self._vol_up_btn.pack(side="left", ipady=12)
self._mute_btn = self._btn(
vol_ctrl, "MUTE", self._toggle_mute,
bg=PANEL, fg=MUTED, font=self.f_btn,
activebackground=RED
)
self._mute_btn.pack(side="right", ipady=6, ipadx=14)
self._divider()
# ── Sound mode ────────────────────────────────────────────────────────
sm = tk.Frame(self, bg=BG, pady=6)
sm.pack(fill="x", padx=24)
tk.Label(sm, text="SOUND MODE", font=self.f_label,
bg=BG, fg=MUTED).pack(side="left")
tk.Label(sm, textvariable=self._sound_mode, font=self.f_value,
bg=BG, fg=ACCENT2).pack(side="right")
self._divider()
# ── Input sources ─────────────────────────────────────────────────────
inp = tk.Frame(self, bg=BG, pady=8)
inp.pack(fill="x", padx=24)
tk.Label(inp, text="INPUT", font=self.f_label,
bg=BG, fg=MUTED).pack(anchor="w")
self._input_frame = tk.Frame(inp, bg=BG)
self._input_frame.pack(fill="x", pady=(8, 0))
# Populated dynamically when sources are received
self._input_buttons = {}
self._render_inputs(INPUT_SOURCES)
self._divider()
# ── Log ───────────────────────────────────────────────────────────────
log_hdr = tk.Frame(self, bg=BG)
log_hdr.pack(fill="x", padx=24, pady=(4, 0))
tk.Label(log_hdr, text="PACKET LOG", font=self.f_label,
bg=BG, fg=MUTED).pack(side="left")
self._btn(log_hdr, "CLEAR", self._clear_log,
bg=BG, fg=MUTED, font=self.f_small).pack(side="right")
self._log = tk.Text(
self, height=8, font=self.f_mono,
bg="#0a0a0a", fg="#3a7a3a", insertbackground=ACCENT,
relief="flat", bd=0, state="disabled",
highlightthickness=1, highlightbackground=BORDER,
padx=8, pady=6
)
self._log.pack(fill="x", padx=16, pady=(4, 16))
self._log.tag_config("sent", foreground="#4fc3f7")
self._log.tag_config("recv", foreground="#81c784")
self._log.tag_config("info", foreground=MUTED)
def _fonts(self):
self.f_brand = tkfont.Font(family="Courier", size=18, weight="bold")
self.f_model = tkfont.Font(family="Courier", size=12)
self.f_label = tkfont.Font(family="Courier", size=8, weight="bold")
self.f_value = tkfont.Font(family="Courier", size=12)
self.f_vol = tkfont.Font(family="Courier", size=32, weight="bold")
self.f_btn = tkfont.Font(family="Courier", size=9, weight="bold")
self.f_arrow = tkfont.Font(family="Courier", size=14, weight="bold")
self.f_mono = tkfont.Font(family="Courier", size=9)
self.f_small = tkfont.Font(family="Courier", size=8)
self.f_dot = tkfont.Font(family="Courier", size=10)
def _btn(self, parent, text, cmd, **kw):
defaults = dict(
text=text, command=cmd,
bg=PANEL, fg=TEXT,
font=self.f_btn,
relief="flat", bd=0,
cursor="hand2",
activeforeground=ACCENT,
activebackground=BORDER,
)
defaults.update(kw)
return tk.Button(parent, **defaults)
def _divider(self):
tk.Frame(self, bg=BORDER, height=1).pack(
fill="x", padx=16, pady=6)
def _render_inputs(self, sources):
for w in self._input_frame.winfo_children():
w.destroy()
self._input_buttons = {}
for code, label in sources.items():
btn = self._btn(
self._input_frame, label,
lambda c=code: self._select_input(c),
bg=PANEL, fg=MUTED, font=self.f_btn,
)
btn.pack(side="left", padx=(0, 6), ipady=5, ipadx=8)
self._input_buttons[code] = btn
if not sources:
tk.Label(self._input_frame, text="No sources discovered yet",
font=self.f_small, bg=BG, fg=MUTED).pack(side="left")
def _toggle_mute(self):
if self._muted:
self.bt.send(CMD_MUTE_OFF)
else:
self.bt.send(CMD_MUTE_ON)
def _update_mute_ui(self):
if self._muted:
self._mute_btn.configure(bg=RED, fg=TEXT, text="MUTED")
else:
self._mute_btn.configure(bg=PANEL, fg=MUTED, text="MUTE")
def _select_input(self, code):
self.bt.send(cmd_select_input(code))
self._highlight_input(code)
def _highlight_input(self, code):
self._cur_input = code
for c, btn in self._input_buttons.items():
if c == code:
btn.configure(fg=ACCENT, bg="#252010",
highlightthickness=1,
highlightbackground=ACCENT,
highlightcolor=ACCENT)
else:
btn.configure(fg=MUTED, bg=PANEL,
highlightthickness=0)
def _clear_log(self):
self._log.configure(state="normal")
self._log.delete("1.0", "end")
self._log.configure(state="disabled")
# ── Device discovery ─────────────────────────────────────────────────────
def _scan_devices(self):
self._devices = find_denon_devices()
self._selected_device = 0 if self._devices else None
self._render_device_list()
def _render_device_list(self):
for w in self._device_frame.winfo_children():
w.destroy()
if not HAS_DBUS:
tk.Label(self._device_frame,
text="Install python3-dbus for auto-detection",
font=self.f_small, bg=PANEL, fg=RED).pack(side="left")
return
if not self._devices:
tk.Label(self._device_frame,
text="No paired Denon device — use SCAN & PAIR",
font=self.f_small, bg=PANEL, fg=MUTED).pack(side="left")
return
if len(self._devices) == 1:
dev = self._devices[0]
tk.Label(self._device_frame,
text=f"{dev['name']} {dev['mac']}",
font=self.f_mono, bg=PANEL, fg=TEXT).pack(side="left")
else:
for i, dev in enumerate(self._devices):
is_sel = (i == self._selected_device)
btn = self._btn(
self._device_frame,
f"{dev['name']}",
lambda idx=i: self._pick_device(idx),
bg="#252010" if is_sel else PANEL,
fg=ACCENT if is_sel else MUTED,
font=self.f_btn,
)
btn.pack(side="left", padx=(0, 6), ipady=3, ipadx=6)
def _pick_device(self, idx):
self._selected_device = idx
self._render_device_list()
# ── Bluetooth discovery & pairing ─────────────────────────────────────────
def _start_discovery(self):
if not HAS_DBUS:
self._log_append("Install python3-dbus for Bluetooth scanning", "info")
return
if self._discovering:
return
self._discovering = True
self._unpaired_devices = []
self._scan_pair_btn.configure(state="disabled", text="SCANNING…")
self._render_unpaired_list()
self._log_append("Starting Bluetooth discovery…", "info")
threading.Thread(target=self._discovery_worker, daemon=True).start()
def _discovery_worker(self):
devices = discover_denon_devices(duration=8)
self._msg_queue.put(("discovery_done", devices))
def _handle_discovery_done(self, devices):
self._discovering = False
self._unpaired_devices = devices
self._scan_pair_btn.configure(state="normal", text="SCAN & PAIR")
self._render_unpaired_list()
count = len(devices)
if count == 0:
self._log_append("Discovery complete — no unpaired Denon devices found", "info")
else:
self._log_append(f"Discovery complete — found {count} unpaired Denon device(s)", "info")
def _render_unpaired_list(self):
for w in self._unpaired_frame.winfo_children():
w.destroy()
if self._discovering:
tk.Label(self._unpaired_frame,
text="Scanning for Denon devices…",
font=self.f_small, bg=PANEL, fg=ACCENT2).pack(anchor="w")
return
if not self._unpaired_devices:
return
tk.Label(self._unpaired_frame,
text="UNPAIRED DEVICES", font=self.f_label,
bg=PANEL, fg=MUTED).pack(anchor="w", pady=(4, 2))
for dev in self._unpaired_devices:
row = tk.Frame(self._unpaired_frame, bg=PANEL)
row.pack(fill="x", pady=1)
tk.Label(row, text=f"{dev['name']} {dev['mac']}",
font=self.f_small, bg=PANEL, fg=MUTED).pack(side="left")
self._btn(
row, "PAIR",
lambda d=dev: self._start_pair(d),
bg=ACCENT2, fg=BG, font=self.f_small,
).pack(side="right", ipadx=8, ipady=2)
def _start_pair(self, device):
if self._pairing:
return
self._pairing = True
self._log_append(f"Pairing with {device['name']} ({device['mac']})…", "info")
for w in self._unpaired_frame.winfo_children():
for child in w.winfo_children():
if isinstance(child, tk.Button):
child.configure(state="disabled")
threading.Thread(
target=self._pair_worker, args=(device,), daemon=True
).start()
def _pair_worker(self, device):
success, error = pair_and_trust_device(device["path"])
self._msg_queue.put(("pair_done", device, success, error))
def _handle_pair_done(self, device, success, error):
self._pairing = False
if success:
self._log_append(f"Paired and trusted: {device['name']}", "info")
self._unpaired_devices = [
d for d in self._unpaired_devices if d["mac"] != device["mac"]
]
self._render_unpaired_list()
self._scan_devices()
else:
self._log_append(f"Pairing failed: {error}", "info")
self._render_unpaired_list()
# ── Connection ────────────────────────────────────────────────────────────
def _toggle_connect(self):
if self.bt.connected:
self.bt.disconnect()
self._conn_btn.configure(text="CONNECT", bg=ACCENT, fg=BG)
else:
if self._selected_device is None or not self._devices:
self._log_append("No paired Denon device — use SCAN & PAIR", "info")
return
mac = self._devices[self._selected_device]["mac"]
self._conn_btn.configure(text="DISCONNECT", bg=RED, fg=TEXT)
threading.Thread(
target=self.bt.connect, args=(mac,), daemon=True
).start()
# ── Message handling ──────────────────────────────────────────────────────
def _on_bt_message(self, data: bytes, direction: str):
self._msg_queue.put(("msg", data, direction))
def _on_bt_status(self, msg: str):
self._msg_queue.put(("status", msg))
def _poll_queue(self):
try:
while True:
item = self._msg_queue.get_nowait()
if item[0] == "status":
self._handle_status(item[1])
elif item[0] == "msg":
self._handle_message(item[1], item[2])
elif item[0] == "discovery_done":
self._handle_discovery_done(item[1])
elif item[0] == "pair_done":
self._handle_pair_done(item[1], item[2], item[3])
except queue.Empty:
pass
self.after(50, self._poll_queue)
def _handle_status(self, msg: str):
self._status.set(msg)
connected = msg.startswith("Connected")
self._status_dot.configure(fg=GREEN if connected else RED)
if not connected:
self._conn_btn.configure(text="CONNECT", bg=ACCENT, fg=BG)
def _handle_message(self, data: bytes, direction: str):
hex_str = " ".join(f"{b:02x}" for b in data)
tag = "sent" if direction == "SENT" else "recv"
arrow = "" if direction == "SENT" else ""
self._log_append(f"{arrow} {hex_str}", tag)
if len(data) < 3:
return
cat = data[2]
typ = data[3] if len(data) > 3 else None
# Volume report: AT 07 02 03 c5 [vol] ...
if cat == 0x07 and typ == 0x02 and len(data) >= 7:
if data[4] == 0x03 and data[5] == 0xc5:
vol = data[6]
self._vol_raw = vol
# Display as dB: raw/2 gives the display value shown on the AVR
vol_db = vol / 2.0
self._vol.set(f"{vol_db:g}")
return
# Mute state: AT 07 1d 01 [state] ...
if cat == 0x07 and typ == 0x1d and len(data) >= 6:
self._muted = (data[4] == 0x01 and data[5] == 0x01)
self._update_mute_ui()
return
# Sound mode: AT 07 1e ...
if cat == 0x07 and typ == 0x1e and len(data) >= 6:
text_bytes = data[5:]
text = bytes(b for b in text_bytes if 32 <= b < 127).decode("ascii", errors="replace").strip()
if text:
self._sound_mode.set(text)
return
# Input sources: AT 00 04 0a ...
if cat == 0x00 and typ == 0x04 and direction == "RECV" and len(data) >= 8:
raw = data[4:]
discovered = {}
for b in raw:
if b in INPUT_SOURCES:
discovered[b] = INPUT_SOURCES[b]
elif 0x40 <= b <= 0x7f:
discovered[b] = f"SRC-{b:02x}"
if discovered:
self._sources = discovered
self._render_inputs(discovered)
return
# Current input: AT 00 07 02 [source] ...
if cat == 0x00 and typ == 0x07 and direction == "RECV" and len(data) >= 5:
if data[4] == 0x02 and len(data) >= 6:
self._highlight_input(data[5])
return
# Device name: AT 00 0e ...
if cat == 0x00 and typ == 0x0e and direction == "RECV" and len(data) >= 6:
name_bytes = data[5:]
name = bytes(b for b in name_bytes if 32 <= b < 127).decode("ascii", errors="replace").strip()
if name:
self._log_append(f" Device: {name}", "info")
return
def _log_append(self, text: str, tag: str = ""):
self._log.configure(state="normal")
self._log.insert("end", text + "\n", tag)
self._log.see("end")
self._log.configure(state="disabled")
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
app = DenonRemoteApp()
app.mainloop()