"""Subprocess lifecycle management for rtl_tcp and rtlamr.""" import logging import os import queue import signal import subprocess import threading import time from typing import Optional from hameter.config import GeneralConfig logger = logging.getLogger(__name__) # How long to wait for rtl_tcp to print "listening..." before giving up. RTL_TCP_STARTUP_TIMEOUT = 10 # How long to wait for rtlamr to produce its first output. RTLAMR_STARTUP_TIMEOUT = 30 # Max consecutive restart failures before increasing backoff. MAX_FAST_RETRIES = 3 FAST_RETRY_DELAY = 5 SLOW_RETRY_DELAY = 30 class SubprocessManager: """Manages the rtl_tcp and rtlamr subprocess lifecycle. Architecture: - rtl_tcp runs as a TCP server providing SDR samples. - rtlamr connects to rtl_tcp and outputs JSON lines to stdout. - A dedicated reader thread puts stdout lines into a queue. - Both processes are started in new sessions (process groups) for reliable cleanup via os.killpg(). """ def __init__(self, config: GeneralConfig, shutdown_event: threading.Event): self._config = config self._shutdown = shutdown_event self._rtl_tcp_proc: Optional[subprocess.Popen] = None self._rtlamr_proc: Optional[subprocess.Popen] = None self._output_queue: queue.Queue[str] = queue.Queue() self._reader_thread: Optional[threading.Thread] = None self._consecutive_failures = 0 def start( self, meter_ids: list[int], protocols: list[str], ) -> bool: """Start rtl_tcp and rtlamr. Returns True on success.""" if not self._start_rtl_tcp(): return False if not self._start_rtlamr(meter_ids, protocols): self._kill_process(self._rtl_tcp_proc, "rtl_tcp") self._rtl_tcp_proc = None return False self._start_reader_thread() self._consecutive_failures = 0 return True def start_discovery_mode(self) -> bool: """Start in discovery mode: no meter ID filter, all protocols.""" if not self._start_rtl_tcp(): return False if not self._start_rtlamr(meter_ids=[], protocols=["all"]): self._kill_process(self._rtl_tcp_proc, "rtl_tcp") self._rtl_tcp_proc = None return False self._start_reader_thread() return True def stop(self): """Stop all subprocesses and the reader thread.""" self._kill_process(self._rtlamr_proc, "rtlamr") self._rtlamr_proc = None self._kill_process(self._rtl_tcp_proc, "rtl_tcp") self._rtl_tcp_proc = None if self._reader_thread and self._reader_thread.is_alive(): self._reader_thread.join(timeout=5) if self._reader_thread.is_alive(): logger.warning("Reader thread did not exit within timeout") self._reader_thread = None # Drain the output queue to prevent memory buildup. while not self._output_queue.empty(): try: self._output_queue.get_nowait() except queue.Empty: break def restart( self, meter_ids: list[int], protocols: list[str], ) -> bool: """Stop everything, wait, then restart.""" self.stop() self._consecutive_failures += 1 if self._consecutive_failures >= MAX_FAST_RETRIES: delay = SLOW_RETRY_DELAY else: delay = FAST_RETRY_DELAY logger.info( "Waiting %ds before restart (attempt %d)...", delay, self._consecutive_failures, ) # Wait but check for shutdown periodically. if self._shutdown.wait(timeout=delay): return False # Shutdown requested during wait. return self.start(meter_ids, protocols) def is_healthy(self) -> bool: """Check if both subprocesses are still running.""" return ( self._rtl_tcp_proc is not None and self._rtl_tcp_proc.poll() is None and self._rtlamr_proc is not None and self._rtlamr_proc.poll() is None ) def get_line(self, timeout: float = 1.0) -> Optional[str]: """Get the next line from rtlamr stdout (non-blocking).""" try: return self._output_queue.get(timeout=timeout) except queue.Empty: return None # ------------------------------------------------------------------ # # Internal helpers # ------------------------------------------------------------------ # def _start_rtl_tcp(self) -> bool: """Start the rtl_tcp server and wait for readiness.""" cmd = [ "rtl_tcp", "-a", self._config.rtl_tcp_host, "-p", str(self._config.rtl_tcp_port), "-d", self._config.device_id, ] logger.info("Starting rtl_tcp: %s", " ".join(cmd)) try: self._rtl_tcp_proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, start_new_session=True, ) except FileNotFoundError: logger.error("rtl_tcp binary not found. Is rtl-sdr installed?") return False # Wait for the "listening..." line that indicates readiness. # If we can't detect the marker, fall back to a simple delay # and check that the process is still alive. if self._wait_for_output( self._rtl_tcp_proc, "listening", RTL_TCP_STARTUP_TIMEOUT, "rtl_tcp", ): return True # Fallback: if process is still running, assume it started OK. # rtl_tcp may buffer its output or print to a different fd. if self._rtl_tcp_proc.poll() is None: logger.warning( "rtl_tcp did not print expected marker, but process is alive — continuing" ) return True return False def _start_rtlamr( self, meter_ids: list[int], protocols: list[str], ) -> bool: """Start rtlamr connected to the running rtl_tcp server.""" msg_types = ",".join(sorted(set(p.lower() for p in protocols))) cmd = [ "rtlamr", "-format=json", f"-server={self._config.rtl_tcp_host}:{self._config.rtl_tcp_port}", f"-msgtype={msg_types}", "-unique", ] if meter_ids: cmd.append(f"-filterid={','.join(str(m) for m in meter_ids)}") cmd.extend(self._config.rtlamr_extra_args) logger.info("Starting rtlamr: %s", " ".join(cmd)) try: self._rtlamr_proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, start_new_session=True, ) except FileNotFoundError: logger.error("rtlamr binary not found. Is rtlamr installed?") return False # Give rtlamr a moment to connect to rtl_tcp and start. time.sleep(2) if self._rtlamr_proc.poll() is not None: stderr = "" if self._rtlamr_proc.stderr: stderr = self._rtlamr_proc.stderr.read() logger.error("rtlamr exited immediately: %s", stderr) return False logger.info("rtlamr started (PID %d)", self._rtlamr_proc.pid) return True def _start_reader_thread(self): """Start a background thread to read rtlamr stdout into the queue.""" self._reader_thread = threading.Thread( target=self._stdout_reader, name="rtlamr-reader", daemon=True, ) self._reader_thread.start() def _stdout_reader(self): """Read rtlamr stdout line-by-line into the output queue.""" try: for line in self._rtlamr_proc.stdout: stripped = line.strip() if stripped: self._output_queue.put(stripped) if self._shutdown.is_set(): break except (ValueError, OSError): pass # Pipe closed during shutdown. finally: logger.debug("stdout reader thread exiting") def _wait_for_output( self, proc: subprocess.Popen, marker: str, timeout: float, name: str, ) -> bool: """Wait for a specific marker string in a process's stdout. Uses a background thread to avoid blocking on readline(). """ found = threading.Event() lines_read: list[str] = [] def _reader(): try: for line in proc.stdout: stripped = line.strip() if stripped: lines_read.append(stripped) logger.debug("%s: %s", name, stripped) if marker.lower() in stripped.lower(): found.set() return except (ValueError, OSError): pass reader = threading.Thread(target=_reader, daemon=True) reader.start() reader.join(timeout=timeout) if found.is_set(): logger.info("%s is ready", name) return True if proc.poll() is not None: logger.error( "%s exited during startup. Output: %s", name, "; ".join(lines_read[-5:]) if lines_read else "(none)", ) else: logger.warning( "%s: marker '%s' not seen within %ds. Output so far: %s", name, marker, int(timeout), "; ".join(lines_read[-5:]) if lines_read else "(none)", ) return False @staticmethod def _kill_process(proc: Optional[subprocess.Popen], name: str): """Reliably terminate a subprocess and its process group.""" if proc is None or proc.poll() is not None: return try: pgid = os.getpgid(proc.pid) logger.info("Sending SIGTERM to %s (pgid %d)", name, pgid) os.killpg(pgid, signal.SIGTERM) try: proc.wait(timeout=5) logger.info("%s terminated cleanly", name) except subprocess.TimeoutExpired: logger.warning( "%s did not exit after SIGTERM, sending SIGKILL", name ) os.killpg(pgid, signal.SIGKILL) proc.wait(timeout=3) except ProcessLookupError: pass # Already dead. except Exception as e: logger.error("Error killing %s: %s", name, e)