152 lines
5.3 KiB
Python
152 lines
5.3 KiB
Python
"""Unit tests for the SubprocessManager class."""
|
|
|
|
import queue
|
|
import threading
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from hameter.subprocess_manager import SubprocessManager
|
|
from hameter.config import GeneralConfig
|
|
|
|
|
|
def _config():
|
|
return GeneralConfig(
|
|
device_id="0",
|
|
rtl_tcp_host="127.0.0.1",
|
|
rtl_tcp_port=1234,
|
|
rtlamr_extra_args=[],
|
|
)
|
|
|
|
|
|
def _mock_proc(pid=1000, alive=True, stdout_lines=None):
|
|
"""Create a mock subprocess.Popen."""
|
|
proc = MagicMock()
|
|
proc.pid = pid
|
|
proc.poll.return_value = None if alive else 1
|
|
proc.wait.return_value = 0
|
|
# SubprocessManager uses text=True, so lines are strings (not bytes)
|
|
if stdout_lines is not None:
|
|
proc.stdout = iter(stdout_lines)
|
|
else:
|
|
proc.stdout = iter([])
|
|
proc.stderr = MagicMock()
|
|
proc.stderr.read.return_value = ""
|
|
return proc
|
|
|
|
|
|
class TestSubprocessManager(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.shutdown = threading.Event()
|
|
self.mgr = SubprocessManager(_config(), self.shutdown)
|
|
|
|
@patch("time.sleep")
|
|
@patch("hameter.subprocess_manager.subprocess.Popen")
|
|
def test_start_success(self, mock_popen, mock_sleep):
|
|
"""start() returns True when both processes start OK."""
|
|
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
|
|
rtlamr = _mock_proc(pid=200)
|
|
mock_popen.side_effect = [rtl, rtlamr]
|
|
|
|
result = self.mgr.start([12345], ["scm"])
|
|
|
|
self.assertTrue(result)
|
|
self.assertEqual(mock_popen.call_count, 2)
|
|
|
|
@patch("time.sleep")
|
|
@patch("hameter.subprocess_manager.subprocess.Popen")
|
|
def test_start_rtl_tcp_not_found(self, mock_popen, mock_sleep):
|
|
"""start() returns False when rtl_tcp binary not found."""
|
|
mock_popen.side_effect = FileNotFoundError("not found")
|
|
|
|
result = self.mgr.start([12345], ["scm"])
|
|
|
|
self.assertFalse(result)
|
|
|
|
@patch("time.sleep")
|
|
@patch("hameter.subprocess_manager.os.killpg")
|
|
@patch("hameter.subprocess_manager.os.getpgid", side_effect=lambda p: p)
|
|
@patch("hameter.subprocess_manager.subprocess.Popen")
|
|
def test_stop_kills_processes(self, mock_popen, mock_getpgid, mock_killpg, mock_sleep):
|
|
"""stop() kills both subprocess groups."""
|
|
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
|
|
rtlamr = _mock_proc(pid=200)
|
|
mock_popen.side_effect = [rtl, rtlamr]
|
|
|
|
self.mgr.start([12345], ["scm"])
|
|
self.mgr.stop()
|
|
|
|
# killpg should be called for both processes
|
|
self.assertGreaterEqual(mock_killpg.call_count, 2)
|
|
|
|
@patch("time.sleep")
|
|
@patch("hameter.subprocess_manager.subprocess.Popen")
|
|
def test_is_healthy_both_running(self, mock_popen, mock_sleep):
|
|
"""is_healthy() True when both poll() return None."""
|
|
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
|
|
rtlamr = _mock_proc(pid=200)
|
|
mock_popen.side_effect = [rtl, rtlamr]
|
|
|
|
self.mgr.start([12345], ["scm"])
|
|
|
|
self.assertTrue(self.mgr.is_healthy())
|
|
|
|
@patch("time.sleep")
|
|
@patch("hameter.subprocess_manager.subprocess.Popen")
|
|
def test_is_healthy_dead_process(self, mock_popen, mock_sleep):
|
|
"""is_healthy() False when one process exits."""
|
|
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
|
|
rtlamr = _mock_proc(pid=200)
|
|
mock_popen.side_effect = [rtl, rtlamr]
|
|
|
|
self.mgr.start([12345], ["scm"])
|
|
rtlamr.poll.return_value = 1
|
|
|
|
self.assertFalse(self.mgr.is_healthy())
|
|
|
|
def test_get_line_from_queue(self):
|
|
"""get_line() returns queued data."""
|
|
self.mgr._output_queue.put("test line")
|
|
self.assertEqual(self.mgr.get_line(timeout=1.0), "test line")
|
|
|
|
def test_get_line_timeout(self):
|
|
"""get_line() returns None on empty queue."""
|
|
self.assertIsNone(self.mgr.get_line(timeout=0.05))
|
|
|
|
@patch("hameter.subprocess_manager.os.killpg")
|
|
@patch("hameter.subprocess_manager.os.getpgid", side_effect=lambda p: p)
|
|
@patch("hameter.subprocess_manager.subprocess.Popen")
|
|
def test_restart_with_backoff(self, mock_popen, mock_getpgid, mock_killpg):
|
|
"""restart() stops, waits, then starts again."""
|
|
procs = [
|
|
_mock_proc(pid=100, stdout_lines=["listening...\n"]),
|
|
_mock_proc(pid=200),
|
|
_mock_proc(pid=300, stdout_lines=["listening...\n"]),
|
|
_mock_proc(pid=400),
|
|
]
|
|
mock_popen.side_effect = procs
|
|
|
|
with patch("time.sleep"):
|
|
self.mgr.start([12345], ["scm"])
|
|
result = self.mgr.restart([12345], ["scm"])
|
|
|
|
self.assertTrue(result)
|
|
self.assertEqual(mock_popen.call_count, 4)
|
|
|
|
@patch("time.sleep")
|
|
@patch("hameter.subprocess_manager.subprocess.Popen")
|
|
def test_start_discovery_mode(self, mock_popen, mock_sleep):
|
|
"""start_discovery_mode() uses 'all' protocols, no filter."""
|
|
rtl = _mock_proc(pid=100, stdout_lines=["listening...\n"])
|
|
rtlamr = _mock_proc(pid=200)
|
|
mock_popen.side_effect = [rtl, rtlamr]
|
|
|
|
result = self.mgr.start_discovery_mode()
|
|
|
|
self.assertTrue(result)
|
|
rtlamr_call = mock_popen.call_args_list[1]
|
|
cmd = rtlamr_call[0][0]
|
|
cmd_str = " ".join(cmd)
|
|
self.assertIn("all", cmd_str.lower())
|
|
self.assertNotIn("filterid", cmd_str.lower())
|