Files
HAMeter/hameter/subprocess_manager.py
2026-03-06 12:25:27 -05:00

321 lines
11 KiB
Python

"""Subprocess lifecycle management for rtl_tcp and rtlamr."""
import logging
import os
import queue
import signal
import subprocess
import threading
import time
from typing import Optional
from hameter.config import GeneralConfig
logger = logging.getLogger(__name__)
# How long to wait for rtl_tcp to print "listening..." before giving up.
RTL_TCP_STARTUP_TIMEOUT = 10
# How long to wait for rtlamr to produce its first output.
RTLAMR_STARTUP_TIMEOUT = 30
# Max consecutive restart failures before increasing backoff.
MAX_FAST_RETRIES = 3
FAST_RETRY_DELAY = 5
SLOW_RETRY_DELAY = 30
class SubprocessManager:
"""Manages the rtl_tcp and rtlamr subprocess lifecycle.
Architecture:
- rtl_tcp runs as a TCP server providing SDR samples.
- rtlamr connects to rtl_tcp and outputs JSON lines to stdout.
- A dedicated reader thread puts stdout lines into a queue.
- Both processes are started in new sessions (process groups) for
reliable cleanup via os.killpg().
"""
def __init__(self, config: GeneralConfig, shutdown_event: threading.Event):
self._config = config
self._shutdown = shutdown_event
self._rtl_tcp_proc: Optional[subprocess.Popen] = None
self._rtlamr_proc: Optional[subprocess.Popen] = None
self._output_queue: queue.Queue[str] = queue.Queue()
self._reader_thread: Optional[threading.Thread] = None
self._consecutive_failures = 0
def start(
self,
meter_ids: list[int],
protocols: list[str],
) -> bool:
"""Start rtl_tcp and rtlamr. Returns True on success."""
if not self._start_rtl_tcp():
return False
if not self._start_rtlamr(meter_ids, protocols):
self._kill_process(self._rtl_tcp_proc, "rtl_tcp")
self._rtl_tcp_proc = None
return False
self._start_reader_thread()
self._consecutive_failures = 0
return True
def start_discovery_mode(self) -> bool:
"""Start in discovery mode: no meter ID filter, all protocols."""
if not self._start_rtl_tcp():
return False
if not self._start_rtlamr(meter_ids=[], protocols=["all"]):
self._kill_process(self._rtl_tcp_proc, "rtl_tcp")
self._rtl_tcp_proc = None
return False
self._start_reader_thread()
return True
def stop(self):
"""Stop all subprocesses and the reader thread."""
self._kill_process(self._rtlamr_proc, "rtlamr")
self._rtlamr_proc = None
self._kill_process(self._rtl_tcp_proc, "rtl_tcp")
self._rtl_tcp_proc = None
if self._reader_thread and self._reader_thread.is_alive():
self._reader_thread.join(timeout=5)
if self._reader_thread.is_alive():
logger.warning("Reader thread did not exit within timeout")
self._reader_thread = None
# Drain the output queue to prevent memory buildup.
while not self._output_queue.empty():
try:
self._output_queue.get_nowait()
except queue.Empty:
break
def restart(
self,
meter_ids: list[int],
protocols: list[str],
) -> bool:
"""Stop everything, wait, then restart."""
self.stop()
self._consecutive_failures += 1
if self._consecutive_failures >= MAX_FAST_RETRIES:
delay = SLOW_RETRY_DELAY
else:
delay = FAST_RETRY_DELAY
logger.info(
"Waiting %ds before restart (attempt %d)...",
delay,
self._consecutive_failures,
)
# Wait but check for shutdown periodically.
if self._shutdown.wait(timeout=delay):
return False # Shutdown requested during wait.
return self.start(meter_ids, protocols)
def is_healthy(self) -> bool:
"""Check if both subprocesses are still running."""
return (
self._rtl_tcp_proc is not None
and self._rtl_tcp_proc.poll() is None
and self._rtlamr_proc is not None
and self._rtlamr_proc.poll() is None
)
def get_line(self, timeout: float = 1.0) -> Optional[str]:
"""Get the next line from rtlamr stdout (non-blocking)."""
try:
return self._output_queue.get(timeout=timeout)
except queue.Empty:
return None
# ------------------------------------------------------------------ #
# Internal helpers
# ------------------------------------------------------------------ #
def _start_rtl_tcp(self) -> bool:
"""Start the rtl_tcp server and wait for readiness."""
cmd = [
"rtl_tcp",
"-a", self._config.rtl_tcp_host,
"-p", str(self._config.rtl_tcp_port),
"-d", self._config.device_id,
]
logger.info("Starting rtl_tcp: %s", " ".join(cmd))
try:
self._rtl_tcp_proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
start_new_session=True,
)
except FileNotFoundError:
logger.error("rtl_tcp binary not found. Is rtl-sdr installed?")
return False
# Wait for the "listening..." line that indicates readiness.
# If we can't detect the marker, fall back to a simple delay
# and check that the process is still alive.
if self._wait_for_output(
self._rtl_tcp_proc,
"listening",
RTL_TCP_STARTUP_TIMEOUT,
"rtl_tcp",
):
return True
# Fallback: if process is still running, assume it started OK.
# rtl_tcp may buffer its output or print to a different fd.
if self._rtl_tcp_proc.poll() is None:
logger.warning(
"rtl_tcp did not print expected marker, but process is alive — continuing"
)
return True
return False
def _start_rtlamr(
self,
meter_ids: list[int],
protocols: list[str],
) -> bool:
"""Start rtlamr connected to the running rtl_tcp server."""
msg_types = ",".join(sorted(set(p.lower() for p in protocols)))
cmd = [
"rtlamr",
"-format=json",
f"-server={self._config.rtl_tcp_host}:{self._config.rtl_tcp_port}",
f"-msgtype={msg_types}",
"-unique",
]
if meter_ids:
cmd.append(f"-filterid={','.join(str(m) for m in meter_ids)}")
cmd.extend(self._config.rtlamr_extra_args)
logger.info("Starting rtlamr: %s", " ".join(cmd))
try:
self._rtlamr_proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
start_new_session=True,
)
except FileNotFoundError:
logger.error("rtlamr binary not found. Is rtlamr installed?")
return False
# Give rtlamr a moment to connect to rtl_tcp and start.
time.sleep(2)
if self._rtlamr_proc.poll() is not None:
stderr = ""
if self._rtlamr_proc.stderr:
stderr = self._rtlamr_proc.stderr.read()
logger.error("rtlamr exited immediately: %s", stderr)
return False
logger.info("rtlamr started (PID %d)", self._rtlamr_proc.pid)
return True
def _start_reader_thread(self):
"""Start a background thread to read rtlamr stdout into the queue."""
self._reader_thread = threading.Thread(
target=self._stdout_reader,
name="rtlamr-reader",
daemon=True,
)
self._reader_thread.start()
def _stdout_reader(self):
"""Read rtlamr stdout line-by-line into the output queue."""
try:
for line in self._rtlamr_proc.stdout:
stripped = line.strip()
if stripped:
self._output_queue.put(stripped)
if self._shutdown.is_set():
break
except (ValueError, OSError):
pass # Pipe closed during shutdown.
finally:
logger.debug("stdout reader thread exiting")
def _wait_for_output(
self,
proc: subprocess.Popen,
marker: str,
timeout: float,
name: str,
) -> bool:
"""Wait for a specific marker string in a process's stdout.
Uses a background thread to avoid blocking on readline().
"""
found = threading.Event()
lines_read: list[str] = []
def _reader():
try:
for line in proc.stdout:
stripped = line.strip()
if stripped:
lines_read.append(stripped)
logger.debug("%s: %s", name, stripped)
if marker.lower() in stripped.lower():
found.set()
return
except (ValueError, OSError):
pass
reader = threading.Thread(target=_reader, daemon=True)
reader.start()
reader.join(timeout=timeout)
if found.is_set():
logger.info("%s is ready", name)
return True
if proc.poll() is not None:
logger.error(
"%s exited during startup. Output: %s",
name,
"; ".join(lines_read[-5:]) if lines_read else "(none)",
)
else:
logger.warning(
"%s: marker '%s' not seen within %ds. Output so far: %s",
name, marker, int(timeout),
"; ".join(lines_read[-5:]) if lines_read else "(none)",
)
return False
@staticmethod
def _kill_process(proc: Optional[subprocess.Popen], name: str):
"""Reliably terminate a subprocess and its process group."""
if proc is None or proc.poll() is not None:
return
try:
pgid = os.getpgid(proc.pid)
logger.info("Sending SIGTERM to %s (pgid %d)", name, pgid)
os.killpg(pgid, signal.SIGTERM)
try:
proc.wait(timeout=5)
logger.info("%s terminated cleanly", name)
except subprocess.TimeoutExpired:
logger.warning(
"%s did not exit after SIGTERM, sending SIGKILL", name
)
os.killpg(pgid, signal.SIGKILL)
proc.wait(timeout=3)
except ProcessLookupError:
pass # Already dead.
except Exception as e:
logger.error("Error killing %s: %s", name, e)