a5d2865eb2
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
858 lines
33 KiB
Python
858 lines
33 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Denon AVR-S540BT Bluetooth Remote — Prototype
|
|
Uses RFCOMM channel 2 (BT SPP 2) as identified from btsnoop capture.
|
|
Run on Linux with: python3 denon_remote.py
|
|
"""
|
|
|
|
import socket
|
|
import threading
|
|
import tkinter as tk
|
|
from tkinter import font as tkfont
|
|
import queue
|
|
import struct
|
|
import time
|
|
|
|
try:
|
|
import dbus
|
|
HAS_DBUS = True
|
|
except ImportError:
|
|
HAS_DBUS = False
|
|
|
|
# ─── Bluetooth device discovery ──────────────────────────────────────────────
|
|
|
|
def find_denon_devices():
|
|
"""Query BlueZ D-Bus for paired devices with 'DENON' in the name."""
|
|
if not HAS_DBUS:
|
|
return []
|
|
try:
|
|
bus = dbus.SystemBus()
|
|
manager = dbus.Interface(
|
|
bus.get_object("org.bluez", "/"),
|
|
"org.freedesktop.DBus.ObjectManager"
|
|
)
|
|
objects = manager.GetManagedObjects()
|
|
results = []
|
|
for path, interfaces in objects.items():
|
|
if "org.bluez.Device1" in interfaces:
|
|
dev = interfaces["org.bluez.Device1"]
|
|
name = str(dev.get("Name", ""))
|
|
if "DENON" in name.upper():
|
|
results.append({
|
|
"mac": str(dev.get("Address")),
|
|
"name": name,
|
|
"paired": bool(dev.get("Paired", False)),
|
|
})
|
|
return results
|
|
except Exception:
|
|
return []
|
|
|
|
def find_bt_adapter():
|
|
"""Return the D-Bus object path of the first BlueZ adapter, or None."""
|
|
if not HAS_DBUS:
|
|
return None
|
|
try:
|
|
bus = dbus.SystemBus()
|
|
manager = dbus.Interface(
|
|
bus.get_object("org.bluez", "/"),
|
|
"org.freedesktop.DBus.ObjectManager"
|
|
)
|
|
objects = manager.GetManagedObjects()
|
|
for path, interfaces in objects.items():
|
|
if "org.bluez.Adapter1" in interfaces:
|
|
return str(path)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def discover_denon_devices(duration=8):
|
|
"""Run BT discovery for `duration` seconds; return unpaired Denon devices.
|
|
|
|
Returns list of dicts: {mac, name, path} where path is the D-Bus object path.
|
|
Must be called from a worker thread (blocks for `duration` seconds).
|
|
"""
|
|
if not HAS_DBUS:
|
|
return []
|
|
adapter_path = find_bt_adapter()
|
|
if adapter_path is None:
|
|
return []
|
|
try:
|
|
bus = dbus.SystemBus()
|
|
adapter = dbus.Interface(
|
|
bus.get_object("org.bluez", adapter_path),
|
|
"org.bluez.Adapter1"
|
|
)
|
|
try:
|
|
adapter.StartDiscovery()
|
|
except dbus.exceptions.DBusException as e:
|
|
if "InProgress" not in str(e):
|
|
return []
|
|
|
|
time.sleep(duration)
|
|
|
|
try:
|
|
adapter.StopDiscovery()
|
|
except Exception:
|
|
pass
|
|
|
|
manager = dbus.Interface(
|
|
bus.get_object("org.bluez", "/"),
|
|
"org.freedesktop.DBus.ObjectManager"
|
|
)
|
|
objects = manager.GetManagedObjects()
|
|
results = []
|
|
for path, interfaces in objects.items():
|
|
if "org.bluez.Device1" in interfaces:
|
|
dev = interfaces["org.bluez.Device1"]
|
|
name = str(dev.get("Name", ""))
|
|
paired = bool(dev.get("Paired", False))
|
|
if "DENON" in name.upper() and not paired:
|
|
results.append({
|
|
"mac": str(dev.get("Address")),
|
|
"name": name,
|
|
"path": str(path),
|
|
})
|
|
return results
|
|
except Exception:
|
|
return []
|
|
|
|
def pair_and_trust_device(device_path):
|
|
"""Pair and trust a BlueZ device. Blocks until pairing completes.
|
|
|
|
Returns (True, "") on success, (False, error_message) on failure.
|
|
"""
|
|
if not HAS_DBUS:
|
|
return (False, "D-Bus not available")
|
|
try:
|
|
bus = dbus.SystemBus()
|
|
device = dbus.Interface(
|
|
bus.get_object("org.bluez", device_path),
|
|
"org.bluez.Device1"
|
|
)
|
|
device.Pair()
|
|
|
|
props = dbus.Interface(
|
|
bus.get_object("org.bluez", device_path),
|
|
"org.freedesktop.DBus.Properties"
|
|
)
|
|
props.Set("org.bluez.Device1", "Trusted", dbus.Boolean(True))
|
|
|
|
return (True, "")
|
|
except dbus.exceptions.DBusException as e:
|
|
if "AlreadyExists" in str(e):
|
|
return (True, "")
|
|
return (False, str(e))
|
|
except Exception as e:
|
|
return (False, str(e))
|
|
|
|
# ─── Protocol constants ───────────────────────────────────────────────────────
|
|
|
|
RFCOMM_CHANNEL = 2
|
|
|
|
# Commands (PHONE -> AVR)
|
|
CMD_VOLUME_UP = bytes([0x41, 0x54, 0x07, 0x00, 0x00, 0x00])
|
|
CMD_VOLUME_DOWN = bytes([0x41, 0x54, 0x07, 0x01, 0x00, 0x00])
|
|
CMD_GET_SOURCES = bytes([0x41, 0x54, 0x00, 0x04, 0x00, 0x00])
|
|
CMD_GET_STATUS = bytes([0x41, 0x54, 0x00, 0x06, 0x00, 0x00])
|
|
CMD_MUTE_ON = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x01, 0xFE])
|
|
CMD_MUTE_OFF = bytes([0x41, 0x54, 0x07, 0x1D, 0x01, 0x00, 0xFF])
|
|
CMD_POWER_OFF = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x00, 0xFF])
|
|
CMD_POWER_ON = bytes([0x41, 0x54, 0x00, 0x0A, 0x01, 0x01, 0xFE])
|
|
|
|
def cmd_select_input(source_byte):
|
|
# AT 00 07 01 [source] [checksum_placeholder]
|
|
return bytes([0x41, 0x54, 0x00, 0x07, 0x01, source_byte, 0x00])
|
|
|
|
# Known input source codes observed in log (R,S,T,V,W bytes)
|
|
# Exact label mapping is approximate — adjust after testing
|
|
INPUT_SOURCES = {
|
|
0x40: "Bluetooth",
|
|
0x55: "Media Player", # 'U' — observed in source list, label approximate
|
|
0x52: "CBL/SAT", # 'R'
|
|
0x53: "DVD", # 'S'
|
|
0x54: "Blu-ray", # 'T'
|
|
0x56: "Game", # 'V'
|
|
0x57: "AUX", # 'W'
|
|
}
|
|
|
|
# ─── Bluetooth connection ─────────────────────────────────────────────────────
|
|
|
|
class DenonBT:
|
|
def __init__(self, on_message, on_status):
|
|
self.sock = None
|
|
self.connected = False
|
|
self.on_message = on_message # callback(bytes)
|
|
self.on_status = on_status # callback(str)
|
|
self._recv_thread = None
|
|
self._stop = threading.Event()
|
|
|
|
def connect(self, mac):
|
|
self._stop.clear()
|
|
try:
|
|
self.on_status("Connecting…")
|
|
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
|
|
s.settimeout(10)
|
|
s.connect((mac, RFCOMM_CHANNEL))
|
|
s.settimeout(None)
|
|
self.sock = s
|
|
self.connected = True
|
|
self.on_status(f"Connected to {mac}")
|
|
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
|
|
self._recv_thread.start()
|
|
# Request initial state
|
|
time.sleep(0.3)
|
|
self.send(CMD_GET_SOURCES)
|
|
self.send(CMD_GET_STATUS)
|
|
except Exception as e:
|
|
self.connected = False
|
|
self.on_status(f"Connection failed: {e}")
|
|
|
|
def disconnect(self):
|
|
self._stop.set()
|
|
self.connected = False
|
|
if self.sock:
|
|
try:
|
|
self.sock.close()
|
|
except Exception:
|
|
pass
|
|
self.sock = None
|
|
self.on_status("Disconnected")
|
|
|
|
def send(self, data: bytes):
|
|
if not self.connected or not self.sock:
|
|
self.on_status("Not connected")
|
|
return
|
|
try:
|
|
self.sock.sendall(data)
|
|
self.on_message(data, direction="SENT")
|
|
except Exception as e:
|
|
self.on_status(f"Send error: {e}")
|
|
self.disconnect()
|
|
|
|
def _recv_loop(self):
|
|
buf = b""
|
|
while not self._stop.is_set() and self.sock:
|
|
try:
|
|
chunk = self.sock.recv(256)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
# Process all complete AT packets in buffer
|
|
while len(buf) >= 4:
|
|
idx = buf.find(b'AT')
|
|
if idx == -1:
|
|
buf = b""
|
|
break
|
|
if idx > 0:
|
|
buf = buf[idx:]
|
|
# Minimum packet is 4 bytes: AT + 2 command bytes
|
|
# Use length byte at offset 3 if available, else wait for more
|
|
if len(buf) < 4:
|
|
break
|
|
pkt_len = self._packet_length(buf)
|
|
if pkt_len is None or len(buf) < pkt_len:
|
|
break
|
|
pkt = buf[:pkt_len]
|
|
buf = buf[pkt_len:]
|
|
self.on_message(pkt, direction="RECV")
|
|
except Exception:
|
|
break
|
|
self.on_status("Disconnected")
|
|
|
|
def _packet_length(self, buf):
|
|
"""Best-effort length detection for AT protocol packets."""
|
|
if len(buf) < 4:
|
|
return None
|
|
cat = buf[2]
|
|
typ = buf[3]
|
|
# Known fixed-length packets
|
|
if cat == 0x07:
|
|
if typ in (0x00, 0x01): return 6 # vol up/down command
|
|
if typ == 0x02: return 9 # vol report
|
|
if typ == 0x03: return 6 # unknown init cmd
|
|
if typ == 0x1e: return 27 # sound mode (AT + cmd + 0x15 + 0x00 + 20 text + chk)
|
|
if typ in (0x0b, 0x1d, 0x25): return 7
|
|
if typ == 0x31: return 14 # firmware/version string
|
|
if cat == 0x00:
|
|
if typ == 0x04:
|
|
if len(buf) > 4 and buf[4] == 0x00:
|
|
return 6 # request
|
|
return 16 # response with sources
|
|
if typ == 0x06:
|
|
if len(buf) > 4 and buf[4] == 0x00:
|
|
return 6 # request
|
|
return 75 # status response
|
|
if typ == 0x07:
|
|
if len(buf) > 4 and buf[4] == 0x01:
|
|
return 7 # input select command
|
|
return 8 # input report
|
|
if typ == 0x09: return 7
|
|
if typ == 0x0a: return 7
|
|
if typ == 0x0b: return 7
|
|
if typ == 0x0d: return 21 # device info with MAC
|
|
if typ == 0x0e: return 16 # device name
|
|
if typ == 0x13: return 7
|
|
if cat == 0x0d:
|
|
if typ == 0x08: return 13
|
|
if typ == 0x0d: return 6
|
|
if cat == 0x81:
|
|
if typ == 0x04: return 7
|
|
if typ == 0x32: return 119 # large volume table
|
|
if typ == 0x40: return 7
|
|
if cat == 0x82:
|
|
if typ == 0x31: return 14
|
|
# Fallback: read until next AT or 32 bytes
|
|
next_at = buf.find(b'AT', 2)
|
|
if next_at > 0:
|
|
return next_at
|
|
return min(32, len(buf))
|
|
|
|
|
|
# ─── GUI ─────────────────────────────────────────────────────────────────────
|
|
|
|
BG = "#0d0d0d"
|
|
PANEL = "#161616"
|
|
BORDER = "#2a2a2a"
|
|
ACCENT = "#e8c97a" # warm gold
|
|
ACCENT2 = "#4fc3f7" # cool blue
|
|
TEXT = "#f0ede6"
|
|
MUTED = "#666"
|
|
RED = "#e05252"
|
|
GREEN = "#4caf50"
|
|
|
|
class DenonRemoteApp(tk.Tk):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.title("Denon AVR-S540BT Remote")
|
|
self.configure(bg=BG)
|
|
self.resizable(False, False)
|
|
|
|
self._msg_queue = queue.Queue()
|
|
self._vol = tk.StringVar(value="—")
|
|
self._vol_raw = None # raw volume byte (0x00-0xFF)
|
|
self._muted = False
|
|
self._sound_mode = tk.StringVar(value="—")
|
|
self._status = tk.StringVar(value="Disconnected")
|
|
self._sources = {} # byte -> label
|
|
self._cur_input = None
|
|
self._devices = [] # list of dicts from find_denon_devices()
|
|
self._selected_device = None # index into _devices
|
|
self._unpaired_devices = [] # list of dicts from discover_denon_devices()
|
|
self._discovering = False
|
|
self._pairing = False
|
|
|
|
self.bt = DenonBT(
|
|
on_message=self._on_bt_message,
|
|
on_status=self._on_bt_status,
|
|
)
|
|
|
|
self._build_ui()
|
|
self._poll_queue()
|
|
self._scan_devices()
|
|
|
|
# ── UI construction ───────────────────────────────────────────────────────
|
|
|
|
def _build_ui(self):
|
|
self._fonts()
|
|
|
|
# ── Header ────────────────────────────────────────────────────────────
|
|
hdr = tk.Frame(self, bg=BG, pady=18)
|
|
hdr.pack(fill="x", padx=24)
|
|
|
|
tk.Label(hdr, text="DENON", font=self.f_brand,
|
|
bg=BG, fg=ACCENT).pack(side="left")
|
|
tk.Label(hdr, text=" AVR-S540BT", font=self.f_model,
|
|
bg=BG, fg=MUTED).pack(side="left", padx=(2, 0))
|
|
|
|
# Status dot
|
|
self._status_dot = tk.Label(hdr, text="●", font=self.f_dot,
|
|
bg=BG, fg=RED)
|
|
self._status_dot.pack(side="right")
|
|
tk.Label(hdr, textvariable=self._status, font=self.f_small,
|
|
bg=BG, fg=MUTED).pack(side="right", padx=(0, 6))
|
|
|
|
self._divider()
|
|
|
|
# ── Connection panel ──────────────────────────────────────────────────
|
|
conn = tk.Frame(self, bg=PANEL, padx=20, pady=16)
|
|
conn.pack(fill="x", padx=16, pady=(12, 0))
|
|
|
|
tk.Label(conn, text="DEVICE", font=self.f_label,
|
|
bg=PANEL, fg=MUTED).grid(row=0, column=0, sticky="w",
|
|
columnspan=3)
|
|
|
|
self._device_frame = tk.Frame(conn, bg=PANEL)
|
|
self._device_frame.grid(row=1, column=0, pady=(4, 0), sticky="w")
|
|
|
|
self._device_label = tk.Label(
|
|
self._device_frame, text="Scanning…", font=self.f_mono,
|
|
bg=PANEL, fg=MUTED, anchor="w"
|
|
)
|
|
self._device_label.pack(side="left")
|
|
|
|
btn_row = tk.Frame(conn, bg=PANEL)
|
|
btn_row.grid(row=0, column=2, sticky="e")
|
|
|
|
self._scan_btn = self._btn(
|
|
btn_row, "SCAN", self._scan_devices,
|
|
bg=PANEL, fg=MUTED, font=self.f_small
|
|
)
|
|
self._scan_btn.pack(side="left", padx=(0, 4))
|
|
|
|
self._scan_pair_btn = self._btn(
|
|
btn_row, "SCAN & PAIR", self._start_discovery,
|
|
bg=PANEL, fg=ACCENT2, font=self.f_small
|
|
)
|
|
self._scan_pair_btn.pack(side="left")
|
|
|
|
self._conn_btn = self._btn(
|
|
conn, "CONNECT", self._toggle_connect,
|
|
bg=ACCENT, fg=BG, font=self.f_btn
|
|
)
|
|
self._conn_btn.grid(row=1, column=2, padx=(10, 0), pady=(4, 0),
|
|
ipady=6, ipadx=14)
|
|
|
|
self._unpaired_frame = tk.Frame(conn, bg=PANEL)
|
|
self._unpaired_frame.grid(row=2, column=0, columnspan=3,
|
|
pady=(8, 0), sticky="w")
|
|
|
|
self._divider()
|
|
|
|
# ── Power panel ──────────────────────────────────────────────────
|
|
pwr_frame = tk.Frame(self, bg=BG, pady=6)
|
|
pwr_frame.pack(fill="x", padx=24)
|
|
|
|
tk.Label(pwr_frame, text="POWER", font=self.f_label,
|
|
bg=BG, fg=MUTED).pack(side="left")
|
|
|
|
self._btn(
|
|
pwr_frame, "OFF", lambda: self.bt.send(CMD_POWER_OFF),
|
|
bg=PANEL, fg=RED, font=self.f_btn,
|
|
).pack(side="right", ipady=5, ipadx=14)
|
|
|
|
self._btn(
|
|
pwr_frame, "ON", lambda: self.bt.send(CMD_POWER_ON),
|
|
bg=PANEL, fg=GREEN, font=self.f_btn,
|
|
).pack(side="right", padx=(0, 6), ipady=5, ipadx=14)
|
|
|
|
self._divider()
|
|
|
|
# ── Volume panel ──────────────────────────────────────────────────────
|
|
vol_frame = tk.Frame(self, bg=BG, pady=8)
|
|
vol_frame.pack(fill="x", padx=24)
|
|
|
|
tk.Label(vol_frame, text="VOLUME", font=self.f_label,
|
|
bg=BG, fg=MUTED).pack(anchor="w")
|
|
|
|
vol_ctrl = tk.Frame(vol_frame, bg=BG)
|
|
vol_ctrl.pack(fill="x", pady=(8, 0))
|
|
|
|
self._vol_down_btn = self._btn(
|
|
vol_ctrl, "▼", lambda: self.bt.send(CMD_VOLUME_DOWN),
|
|
bg=PANEL, fg=TEXT, font=self.f_arrow,
|
|
width=4, activebackground=BORDER
|
|
)
|
|
self._vol_down_btn.pack(side="left", ipady=12)
|
|
|
|
tk.Label(vol_ctrl, textvariable=self._vol, font=self.f_vol,
|
|
bg=BG, fg=ACCENT, width=6).pack(side="left")
|
|
|
|
self._vol_up_btn = self._btn(
|
|
vol_ctrl, "▲", lambda: self.bt.send(CMD_VOLUME_UP),
|
|
bg=PANEL, fg=TEXT, font=self.f_arrow,
|
|
width=4, activebackground=BORDER
|
|
)
|
|
self._vol_up_btn.pack(side="left", ipady=12)
|
|
|
|
self._mute_btn = self._btn(
|
|
vol_ctrl, "MUTE", self._toggle_mute,
|
|
bg=PANEL, fg=MUTED, font=self.f_btn,
|
|
activebackground=RED
|
|
)
|
|
self._mute_btn.pack(side="right", ipady=6, ipadx=14)
|
|
|
|
self._divider()
|
|
|
|
# ── Sound mode ────────────────────────────────────────────────────────
|
|
sm = tk.Frame(self, bg=BG, pady=6)
|
|
sm.pack(fill="x", padx=24)
|
|
|
|
tk.Label(sm, text="SOUND MODE", font=self.f_label,
|
|
bg=BG, fg=MUTED).pack(side="left")
|
|
tk.Label(sm, textvariable=self._sound_mode, font=self.f_value,
|
|
bg=BG, fg=ACCENT2).pack(side="right")
|
|
|
|
self._divider()
|
|
|
|
# ── Input sources ─────────────────────────────────────────────────────
|
|
inp = tk.Frame(self, bg=BG, pady=8)
|
|
inp.pack(fill="x", padx=24)
|
|
|
|
tk.Label(inp, text="INPUT", font=self.f_label,
|
|
bg=BG, fg=MUTED).pack(anchor="w")
|
|
|
|
self._input_frame = tk.Frame(inp, bg=BG)
|
|
self._input_frame.pack(fill="x", pady=(8, 0))
|
|
|
|
# Populated dynamically when sources are received
|
|
self._input_buttons = {}
|
|
self._render_inputs(INPUT_SOURCES)
|
|
|
|
self._divider()
|
|
|
|
# ── Log ───────────────────────────────────────────────────────────────
|
|
log_hdr = tk.Frame(self, bg=BG)
|
|
log_hdr.pack(fill="x", padx=24, pady=(4, 0))
|
|
tk.Label(log_hdr, text="PACKET LOG", font=self.f_label,
|
|
bg=BG, fg=MUTED).pack(side="left")
|
|
self._btn(log_hdr, "CLEAR", self._clear_log,
|
|
bg=BG, fg=MUTED, font=self.f_small).pack(side="right")
|
|
|
|
self._log = tk.Text(
|
|
self, height=8, font=self.f_mono,
|
|
bg="#0a0a0a", fg="#3a7a3a", insertbackground=ACCENT,
|
|
relief="flat", bd=0, state="disabled",
|
|
highlightthickness=1, highlightbackground=BORDER,
|
|
padx=8, pady=6
|
|
)
|
|
self._log.pack(fill="x", padx=16, pady=(4, 16))
|
|
self._log.tag_config("sent", foreground="#4fc3f7")
|
|
self._log.tag_config("recv", foreground="#81c784")
|
|
self._log.tag_config("info", foreground=MUTED)
|
|
|
|
def _fonts(self):
|
|
self.f_brand = tkfont.Font(family="Courier", size=18, weight="bold")
|
|
self.f_model = tkfont.Font(family="Courier", size=12)
|
|
self.f_label = tkfont.Font(family="Courier", size=8, weight="bold")
|
|
self.f_value = tkfont.Font(family="Courier", size=12)
|
|
self.f_vol = tkfont.Font(family="Courier", size=32, weight="bold")
|
|
self.f_btn = tkfont.Font(family="Courier", size=9, weight="bold")
|
|
self.f_arrow = tkfont.Font(family="Courier", size=14, weight="bold")
|
|
self.f_mono = tkfont.Font(family="Courier", size=9)
|
|
self.f_small = tkfont.Font(family="Courier", size=8)
|
|
self.f_dot = tkfont.Font(family="Courier", size=10)
|
|
|
|
def _btn(self, parent, text, cmd, **kw):
|
|
defaults = dict(
|
|
text=text, command=cmd,
|
|
bg=PANEL, fg=TEXT,
|
|
font=self.f_btn,
|
|
relief="flat", bd=0,
|
|
cursor="hand2",
|
|
activeforeground=ACCENT,
|
|
activebackground=BORDER,
|
|
)
|
|
defaults.update(kw)
|
|
return tk.Button(parent, **defaults)
|
|
|
|
def _divider(self):
|
|
tk.Frame(self, bg=BORDER, height=1).pack(
|
|
fill="x", padx=16, pady=6)
|
|
|
|
def _render_inputs(self, sources):
|
|
for w in self._input_frame.winfo_children():
|
|
w.destroy()
|
|
self._input_buttons = {}
|
|
for code, label in sources.items():
|
|
btn = self._btn(
|
|
self._input_frame, label,
|
|
lambda c=code: self._select_input(c),
|
|
bg=PANEL, fg=MUTED, font=self.f_btn,
|
|
)
|
|
btn.pack(side="left", padx=(0, 6), ipady=5, ipadx=8)
|
|
self._input_buttons[code] = btn
|
|
if not sources:
|
|
tk.Label(self._input_frame, text="No sources discovered yet",
|
|
font=self.f_small, bg=BG, fg=MUTED).pack(side="left")
|
|
|
|
def _toggle_mute(self):
|
|
if self._muted:
|
|
self.bt.send(CMD_MUTE_OFF)
|
|
else:
|
|
self.bt.send(CMD_MUTE_ON)
|
|
|
|
def _update_mute_ui(self):
|
|
if self._muted:
|
|
self._mute_btn.configure(bg=RED, fg=TEXT, text="MUTED")
|
|
else:
|
|
self._mute_btn.configure(bg=PANEL, fg=MUTED, text="MUTE")
|
|
|
|
def _select_input(self, code):
|
|
self.bt.send(cmd_select_input(code))
|
|
self._highlight_input(code)
|
|
|
|
def _highlight_input(self, code):
|
|
self._cur_input = code
|
|
for c, btn in self._input_buttons.items():
|
|
if c == code:
|
|
btn.configure(fg=ACCENT, bg="#252010",
|
|
highlightthickness=1,
|
|
highlightbackground=ACCENT,
|
|
highlightcolor=ACCENT)
|
|
else:
|
|
btn.configure(fg=MUTED, bg=PANEL,
|
|
highlightthickness=0)
|
|
|
|
def _clear_log(self):
|
|
self._log.configure(state="normal")
|
|
self._log.delete("1.0", "end")
|
|
self._log.configure(state="disabled")
|
|
|
|
# ── Device discovery ─────────────────────────────────────────────────────
|
|
|
|
def _scan_devices(self):
|
|
self._devices = find_denon_devices()
|
|
self._selected_device = 0 if self._devices else None
|
|
self._render_device_list()
|
|
|
|
def _render_device_list(self):
|
|
for w in self._device_frame.winfo_children():
|
|
w.destroy()
|
|
|
|
if not HAS_DBUS:
|
|
tk.Label(self._device_frame,
|
|
text="Install python3-dbus for auto-detection",
|
|
font=self.f_small, bg=PANEL, fg=RED).pack(side="left")
|
|
return
|
|
|
|
if not self._devices:
|
|
tk.Label(self._device_frame,
|
|
text="No paired Denon device — use SCAN & PAIR",
|
|
font=self.f_small, bg=PANEL, fg=MUTED).pack(side="left")
|
|
return
|
|
|
|
if len(self._devices) == 1:
|
|
dev = self._devices[0]
|
|
tk.Label(self._device_frame,
|
|
text=f"{dev['name']} {dev['mac']}",
|
|
font=self.f_mono, bg=PANEL, fg=TEXT).pack(side="left")
|
|
else:
|
|
for i, dev in enumerate(self._devices):
|
|
is_sel = (i == self._selected_device)
|
|
btn = self._btn(
|
|
self._device_frame,
|
|
f"{dev['name']}",
|
|
lambda idx=i: self._pick_device(idx),
|
|
bg="#252010" if is_sel else PANEL,
|
|
fg=ACCENT if is_sel else MUTED,
|
|
font=self.f_btn,
|
|
)
|
|
btn.pack(side="left", padx=(0, 6), ipady=3, ipadx=6)
|
|
|
|
def _pick_device(self, idx):
|
|
self._selected_device = idx
|
|
self._render_device_list()
|
|
|
|
# ── Bluetooth discovery & pairing ─────────────────────────────────────────
|
|
|
|
def _start_discovery(self):
|
|
if not HAS_DBUS:
|
|
self._log_append("Install python3-dbus for Bluetooth scanning", "info")
|
|
return
|
|
if self._discovering:
|
|
return
|
|
self._discovering = True
|
|
self._unpaired_devices = []
|
|
self._scan_pair_btn.configure(state="disabled", text="SCANNING…")
|
|
self._render_unpaired_list()
|
|
self._log_append("Starting Bluetooth discovery…", "info")
|
|
threading.Thread(target=self._discovery_worker, daemon=True).start()
|
|
|
|
def _discovery_worker(self):
|
|
devices = discover_denon_devices(duration=8)
|
|
self._msg_queue.put(("discovery_done", devices))
|
|
|
|
def _handle_discovery_done(self, devices):
|
|
self._discovering = False
|
|
self._unpaired_devices = devices
|
|
self._scan_pair_btn.configure(state="normal", text="SCAN & PAIR")
|
|
self._render_unpaired_list()
|
|
count = len(devices)
|
|
if count == 0:
|
|
self._log_append("Discovery complete — no unpaired Denon devices found", "info")
|
|
else:
|
|
self._log_append(f"Discovery complete — found {count} unpaired Denon device(s)", "info")
|
|
|
|
def _render_unpaired_list(self):
|
|
for w in self._unpaired_frame.winfo_children():
|
|
w.destroy()
|
|
|
|
if self._discovering:
|
|
tk.Label(self._unpaired_frame,
|
|
text="Scanning for Denon devices…",
|
|
font=self.f_small, bg=PANEL, fg=ACCENT2).pack(anchor="w")
|
|
return
|
|
|
|
if not self._unpaired_devices:
|
|
return
|
|
|
|
tk.Label(self._unpaired_frame,
|
|
text="UNPAIRED DEVICES", font=self.f_label,
|
|
bg=PANEL, fg=MUTED).pack(anchor="w", pady=(4, 2))
|
|
|
|
for dev in self._unpaired_devices:
|
|
row = tk.Frame(self._unpaired_frame, bg=PANEL)
|
|
row.pack(fill="x", pady=1)
|
|
|
|
tk.Label(row, text=f"{dev['name']} {dev['mac']}",
|
|
font=self.f_small, bg=PANEL, fg=MUTED).pack(side="left")
|
|
|
|
self._btn(
|
|
row, "PAIR",
|
|
lambda d=dev: self._start_pair(d),
|
|
bg=ACCENT2, fg=BG, font=self.f_small,
|
|
).pack(side="right", ipadx=8, ipady=2)
|
|
|
|
def _start_pair(self, device):
|
|
if self._pairing:
|
|
return
|
|
self._pairing = True
|
|
self._log_append(f"Pairing with {device['name']} ({device['mac']})…", "info")
|
|
for w in self._unpaired_frame.winfo_children():
|
|
for child in w.winfo_children():
|
|
if isinstance(child, tk.Button):
|
|
child.configure(state="disabled")
|
|
threading.Thread(
|
|
target=self._pair_worker, args=(device,), daemon=True
|
|
).start()
|
|
|
|
def _pair_worker(self, device):
|
|
success, error = pair_and_trust_device(device["path"])
|
|
self._msg_queue.put(("pair_done", device, success, error))
|
|
|
|
def _handle_pair_done(self, device, success, error):
|
|
self._pairing = False
|
|
if success:
|
|
self._log_append(f"Paired and trusted: {device['name']}", "info")
|
|
self._unpaired_devices = [
|
|
d for d in self._unpaired_devices if d["mac"] != device["mac"]
|
|
]
|
|
self._render_unpaired_list()
|
|
self._scan_devices()
|
|
else:
|
|
self._log_append(f"Pairing failed: {error}", "info")
|
|
self._render_unpaired_list()
|
|
|
|
# ── Connection ────────────────────────────────────────────────────────────
|
|
|
|
def _toggle_connect(self):
|
|
if self.bt.connected:
|
|
self.bt.disconnect()
|
|
self._conn_btn.configure(text="CONNECT", bg=ACCENT, fg=BG)
|
|
else:
|
|
if self._selected_device is None or not self._devices:
|
|
self._log_append("No paired Denon device — use SCAN & PAIR", "info")
|
|
return
|
|
mac = self._devices[self._selected_device]["mac"]
|
|
self._conn_btn.configure(text="DISCONNECT", bg=RED, fg=TEXT)
|
|
threading.Thread(
|
|
target=self.bt.connect, args=(mac,), daemon=True
|
|
).start()
|
|
|
|
# ── Message handling ──────────────────────────────────────────────────────
|
|
|
|
def _on_bt_message(self, data: bytes, direction: str):
|
|
self._msg_queue.put(("msg", data, direction))
|
|
|
|
def _on_bt_status(self, msg: str):
|
|
self._msg_queue.put(("status", msg))
|
|
|
|
def _poll_queue(self):
|
|
try:
|
|
while True:
|
|
item = self._msg_queue.get_nowait()
|
|
if item[0] == "status":
|
|
self._handle_status(item[1])
|
|
elif item[0] == "msg":
|
|
self._handle_message(item[1], item[2])
|
|
elif item[0] == "discovery_done":
|
|
self._handle_discovery_done(item[1])
|
|
elif item[0] == "pair_done":
|
|
self._handle_pair_done(item[1], item[2], item[3])
|
|
except queue.Empty:
|
|
pass
|
|
self.after(50, self._poll_queue)
|
|
|
|
def _handle_status(self, msg: str):
|
|
self._status.set(msg)
|
|
connected = msg.startswith("Connected")
|
|
self._status_dot.configure(fg=GREEN if connected else RED)
|
|
if not connected:
|
|
self._conn_btn.configure(text="CONNECT", bg=ACCENT, fg=BG)
|
|
|
|
def _handle_message(self, data: bytes, direction: str):
|
|
hex_str = " ".join(f"{b:02x}" for b in data)
|
|
tag = "sent" if direction == "SENT" else "recv"
|
|
arrow = "→" if direction == "SENT" else "←"
|
|
self._log_append(f"{arrow} {hex_str}", tag)
|
|
|
|
if len(data) < 3:
|
|
return
|
|
|
|
cat = data[2]
|
|
typ = data[3] if len(data) > 3 else None
|
|
|
|
# Volume report: AT 07 02 03 c5 [vol] ...
|
|
if cat == 0x07 and typ == 0x02 and len(data) >= 7:
|
|
if data[4] == 0x03 and data[5] == 0xc5:
|
|
vol = data[6]
|
|
self._vol_raw = vol
|
|
# Display as dB: raw/2 gives the display value shown on the AVR
|
|
vol_db = vol / 2.0
|
|
self._vol.set(f"{vol_db:g}")
|
|
return
|
|
|
|
# Mute state: AT 07 1d 01 [state] ...
|
|
if cat == 0x07 and typ == 0x1d and len(data) >= 6:
|
|
self._muted = (data[4] == 0x01 and data[5] == 0x01)
|
|
self._update_mute_ui()
|
|
return
|
|
|
|
# Sound mode: AT 07 1e ...
|
|
if cat == 0x07 and typ == 0x1e and len(data) >= 6:
|
|
text_bytes = data[5:]
|
|
text = bytes(b for b in text_bytes if 32 <= b < 127).decode("ascii", errors="replace").strip()
|
|
if text:
|
|
self._sound_mode.set(text)
|
|
return
|
|
|
|
# Input sources: AT 00 04 0a ...
|
|
if cat == 0x00 and typ == 0x04 and direction == "RECV" and len(data) >= 8:
|
|
raw = data[4:]
|
|
discovered = {}
|
|
for b in raw:
|
|
if b in INPUT_SOURCES:
|
|
discovered[b] = INPUT_SOURCES[b]
|
|
elif 0x40 <= b <= 0x7f:
|
|
discovered[b] = f"SRC-{b:02x}"
|
|
if discovered:
|
|
self._sources = discovered
|
|
self._render_inputs(discovered)
|
|
return
|
|
|
|
# Current input: AT 00 07 02 [source] ...
|
|
if cat == 0x00 and typ == 0x07 and direction == "RECV" and len(data) >= 5:
|
|
if data[4] == 0x02 and len(data) >= 6:
|
|
self._highlight_input(data[5])
|
|
return
|
|
|
|
# Device name: AT 00 0e ...
|
|
if cat == 0x00 and typ == 0x0e and direction == "RECV" and len(data) >= 6:
|
|
name_bytes = data[5:]
|
|
name = bytes(b for b in name_bytes if 32 <= b < 127).decode("ascii", errors="replace").strip()
|
|
if name:
|
|
self._log_append(f" Device: {name}", "info")
|
|
return
|
|
|
|
def _log_append(self, text: str, tag: str = ""):
|
|
self._log.configure(state="normal")
|
|
self._log.insert("end", text + "\n", tag)
|
|
self._log.see("end")
|
|
self._log.configure(state="disabled")
|
|
|
|
|
|
# ─── Entry point ─────────────────────────────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
app = DenonRemoteApp()
|
|
app.mainloop() |