Files
HAMeter/tests/test_subprocess.py
2026-03-06 12:25:27 -05:00

152 lines
5.3 KiB
Python

"""Unit tests for the SubprocessManager class."""
import queue
import threading
import unittest
from unittest.mock import MagicMock, patch
from hameter.subprocess_manager import SubprocessManager
from hameter.config import GeneralConfig
def _config():
return GeneralConfig(
device_id="0",
rtl_tcp_host="127.0.0.1",
rtl_tcp_port=1234,
rtlamr_extra_args=[],
)
def _mock_proc(pid=1000, alive=True, stdout_lines=None):
"""Create a mock subprocess.Popen."""
proc = MagicMock()
proc.pid = pid
proc.poll.return_value = None if alive else 1
proc.wait.return_value = 0
# SubprocessManager uses text=True, so lines are strings (not bytes)
if stdout_lines is not None:
proc.stdout = iter(stdout_lines)
else:
proc.stdout = iter([])
proc.stderr = MagicMock()
proc.stderr.read.return_value = ""
return proc
class TestSubprocessManager(unittest.TestCase):
def setUp(self):
self.shutdown = threading.Event()
self.mgr = SubprocessManager(_config(), self.shutdown)
@patch("time.sleep")
@patch("hameter.subprocess_manager.subprocess.Popen")
def test_start_success(self, mock_popen, mock_sleep):
"""start() returns True when both processes start OK."""
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
rtlamr = _mock_proc(pid=200)
mock_popen.side_effect = [rtl, rtlamr]
result = self.mgr.start([12345], ["scm"])
self.assertTrue(result)
self.assertEqual(mock_popen.call_count, 2)
@patch("time.sleep")
@patch("hameter.subprocess_manager.subprocess.Popen")
def test_start_rtl_tcp_not_found(self, mock_popen, mock_sleep):
"""start() returns False when rtl_tcp binary not found."""
mock_popen.side_effect = FileNotFoundError("not found")
result = self.mgr.start([12345], ["scm"])
self.assertFalse(result)
@patch("time.sleep")
@patch("hameter.subprocess_manager.os.killpg")
@patch("hameter.subprocess_manager.os.getpgid", side_effect=lambda p: p)
@patch("hameter.subprocess_manager.subprocess.Popen")
def test_stop_kills_processes(self, mock_popen, mock_getpgid, mock_killpg, mock_sleep):
"""stop() kills both subprocess groups."""
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
rtlamr = _mock_proc(pid=200)
mock_popen.side_effect = [rtl, rtlamr]
self.mgr.start([12345], ["scm"])
self.mgr.stop()
# killpg should be called for both processes
self.assertGreaterEqual(mock_killpg.call_count, 2)
@patch("time.sleep")
@patch("hameter.subprocess_manager.subprocess.Popen")
def test_is_healthy_both_running(self, mock_popen, mock_sleep):
"""is_healthy() True when both poll() return None."""
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
rtlamr = _mock_proc(pid=200)
mock_popen.side_effect = [rtl, rtlamr]
self.mgr.start([12345], ["scm"])
self.assertTrue(self.mgr.is_healthy())
@patch("time.sleep")
@patch("hameter.subprocess_manager.subprocess.Popen")
def test_is_healthy_dead_process(self, mock_popen, mock_sleep):
"""is_healthy() False when one process exits."""
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
rtlamr = _mock_proc(pid=200)
mock_popen.side_effect = [rtl, rtlamr]
self.mgr.start([12345], ["scm"])
rtlamr.poll.return_value = 1
self.assertFalse(self.mgr.is_healthy())
def test_get_line_from_queue(self):
"""get_line() returns queued data."""
self.mgr._output_queue.put("test line")
self.assertEqual(self.mgr.get_line(timeout=1.0), "test line")
def test_get_line_timeout(self):
"""get_line() returns None on empty queue."""
self.assertIsNone(self.mgr.get_line(timeout=0.05))
@patch("hameter.subprocess_manager.os.killpg")
@patch("hameter.subprocess_manager.os.getpgid", side_effect=lambda p: p)
@patch("hameter.subprocess_manager.subprocess.Popen")
def test_restart_with_backoff(self, mock_popen, mock_getpgid, mock_killpg):
"""restart() stops, waits, then starts again."""
procs = [
_mock_proc(pid=100, stdout_lines=["listening...\n"]),
_mock_proc(pid=200),
_mock_proc(pid=300, stdout_lines=["listening...\n"]),
_mock_proc(pid=400),
]
mock_popen.side_effect = procs
with patch("time.sleep"):
self.mgr.start([12345], ["scm"])
result = self.mgr.restart([12345], ["scm"])
self.assertTrue(result)
self.assertEqual(mock_popen.call_count, 4)
@patch("time.sleep")
@patch("hameter.subprocess_manager.subprocess.Popen")
def test_start_discovery_mode(self, mock_popen, mock_sleep):
"""start_discovery_mode() uses 'all' protocols, no filter."""
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
rtlamr = _mock_proc(pid=200)
mock_popen.side_effect = [rtl, rtlamr]
result = self.mgr.start_discovery_mode()
self.assertTrue(result)
rtlamr_call = mock_popen.call_args_list[1]
cmd = rtlamr_call[0][0]
cmd_str = " ".join(cmd)
self.assertIn("all", cmd_str.lower())
self.assertNotIn("filterid", cmd_str.lower())