"""Unit tests for the Pipeline class.""" import threading from unittest.mock import MagicMock, patch import pytest from hameter.pipeline import Pipeline from hameter.meter import MeterReading from hameter.config import ( HaMeterConfig, GeneralConfig, MqttConfig, MeterConfig, RateComponent, ) from hameter.cost import CostResult from hameter.state import AppState, CostState # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_config(meters=None): if meters is None: meters = [ MeterConfig( id=12345, protocol="scm", name="Electric Meter", unit_of_measurement="kWh", cost_factors=[ RateComponent(name="energy", rate=0.12, type="per_unit"), ], ), ] return HaMeterConfig( general=GeneralConfig(), mqtt=MqttConfig(host="localhost"), meters=meters, ) def _make_reading(meter_id=12345, raw=100000, calibrated=1000.0): return MeterReading( meter_id=meter_id, protocol="SCM", raw_consumption=raw, calibrated_consumption=calibrated, timestamp="2026-03-05T12:00:00Z", raw_message={"ID": meter_id, "Consumption": raw}, ) _P = "hameter.pipeline" # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- @patch(f"{_P}.HaMeterMQTT") @patch(f"{_P}.SubprocessManager") class TestPipeline: def test_pipeline_starts_and_shuts_down(self, MockSubMgr, MockMQTT): """Pipeline connects MQTT, starts subprocesses, and shuts down.""" shutdown = threading.Event() app_state = AppState() config = _make_config() proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True proc.get_line.side_effect = lambda timeout=1.0: (shutdown.set(), None)[1] Pipeline(config, shutdown, app_state).run() mqtt.connect.assert_called_once() proc.start.assert_called_once() proc.stop.assert_called_once() mqtt.disconnect.assert_called_once() @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_processes_reading(self, mock_parse, MockSubMgr, MockMQTT): """A valid reading is parsed and published via MQTT.""" shutdown = threading.Event() app_state = AppState() config = _make_config() proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True reading = _make_reading() calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] == 1: return '{"Type":"SCM","Message":{"ID":12345}}' shutdown.set() return None proc.get_line.side_effect = get_line mock_parse.return_value = reading Pipeline(config, shutdown, app_state).run() mock_parse.assert_called_once() mqtt.publish_reading.assert_called_once_with(reading) @patch(f"{_P}.save_cost_state") @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_cost_first_reading_sets_baseline( self, mock_parse, mock_save, MockSubMgr, MockMQTT ): """First reading sets baseline, no cost published.""" shutdown = threading.Event() app_state = AppState() config = _make_config() proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True reading = _make_reading(calibrated=1000.0) calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] == 1: return '{"data":"x"}' shutdown.set() return None proc.get_line.side_effect = get_line mock_parse.return_value = reading Pipeline(config, shutdown, app_state).run() mqtt.publish_cost.assert_not_called() cs = app_state.get_cost_state(12345) assert cs is not None assert cs.last_calibrated_reading == 1000.0 @patch(f"{_P}.calculate_incremental_cost") @patch(f"{_P}.save_cost_state") @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_cost_with_valid_delta( self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT ): """With a baseline, a positive delta triggers cost calculation.""" shutdown = threading.Event() app_state = AppState() config = _make_config() # Pre-set cost state with a baseline app_state.update_cost_state(12345, CostState( cumulative_cost=1.20, last_calibrated_reading=1000.0, billing_period_start="2026-03-01T00:00:00Z", )) proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True reading = _make_reading(calibrated=1010.0) # delta = 10.0 calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] == 1: return '{"data":"x"}' shutdown.set() return None proc.get_line.side_effect = get_line mock_parse.return_value = reading mock_calc.return_value = CostResult( delta=10.0, per_unit_cost=1.20, component_costs=[], total_incremental_cost=1.20, ) Pipeline(config, shutdown, app_state).run() mock_calc.assert_called_once() mqtt.publish_cost.assert_called_once() cs = app_state.get_cost_state(12345) assert cs.cumulative_cost == pytest.approx(2.40) mock_save.assert_called() @patch(f"{_P}.save_cost_state") @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_cost_skips_no_cost_factors( self, mock_parse, mock_save, MockSubMgr, MockMQTT ): """Meter without cost_factors: no cost processing.""" shutdown = threading.Event() app_state = AppState() config = _make_config(meters=[ MeterConfig(id=99999, protocol="scm", name="Water", unit_of_measurement="gal", cost_factors=[]), ]) proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True reading = _make_reading(meter_id=99999, calibrated=500.0) calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] == 1: return '{"data":"x"}' shutdown.set() return None proc.get_line.side_effect = get_line mock_parse.return_value = reading Pipeline(config, shutdown, app_state).run() mqtt.publish_reading.assert_called_once() mqtt.publish_cost.assert_not_called() mock_save.assert_not_called() @patch(f"{_P}.calculate_incremental_cost") @patch(f"{_P}.save_cost_state") @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_cost_skips_negative_delta( self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT ): """Delta <= 0 skips cost calculation.""" shutdown = threading.Event() app_state = AppState() config = _make_config() app_state.update_cost_state(12345, CostState( cumulative_cost=5.0, last_calibrated_reading=1000.0, )) reading = _make_reading(calibrated=1000.0) # delta = 0 proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] == 1: return '{"data":"x"}' shutdown.set() return None proc.get_line.side_effect = get_line mock_parse.return_value = reading Pipeline(config, shutdown, app_state).run() mock_calc.assert_not_called() mqtt.publish_cost.assert_not_called() @patch(f"{_P}.save_cost_state") @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_cost_after_billing_reset( self, mock_parse, mock_save, MockSubMgr, MockMQTT ): """After billing reset (last_calibrated_reading=None), sets baseline only.""" shutdown = threading.Event() app_state = AppState() config = _make_config() app_state.update_cost_state(12345, CostState( cumulative_cost=0.0, last_calibrated_reading=None, billing_period_start="2026-03-01T00:00:00Z", )) reading = _make_reading(calibrated=1050.0) proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] == 1: return '{"data":"x"}' shutdown.set() return None proc.get_line.side_effect = get_line mock_parse.return_value = reading Pipeline(config, shutdown, app_state).run() mqtt.publish_cost.assert_not_called() cs = app_state.get_cost_state(12345) assert cs.last_calibrated_reading == 1050.0 @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_handles_restart_request( self, mock_parse, MockSubMgr, MockMQTT ): """Restart request causes main loop to exit.""" shutdown = threading.Event() app_state = AppState() config = _make_config() proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] == 1: app_state.restart_requested.set() return None proc.get_line.side_effect = get_line Pipeline(config, shutdown, app_state).run() mock_parse.assert_not_called() proc.stop.assert_called_once() mqtt.disconnect.assert_called_once() @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_handles_unhealthy_subprocess( self, mock_parse, MockSubMgr, MockMQTT ): """Unhealthy subprocess triggers a restart attempt.""" shutdown = threading.Event() app_state = AppState() config = _make_config() proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.restart.return_value = True health = iter([False, True, True]) proc.is_healthy.side_effect = lambda: next(health, True) calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] >= 2: shutdown.set() return None proc.get_line.side_effect = get_line Pipeline(config, shutdown, app_state).run() proc.restart.assert_called() @patch(f"{_P}.calculate_incremental_cost") @patch(f"{_P}.save_cost_state") @patch(f"{_P}.parse_rtlamr_line") def test_pipeline_saves_cost_state( self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT ): """save_cost_state is called after a cost update.""" shutdown = threading.Event() app_state = AppState() config = _make_config() app_state.update_cost_state(12345, CostState( cumulative_cost=0.0, last_calibrated_reading=900.0, )) reading = _make_reading(calibrated=950.0) proc = MockSubMgr.return_value mqtt = MockMQTT.return_value proc.start.return_value = True proc.is_healthy.return_value = True calls = [0] def get_line(timeout=1.0): calls[0] += 1 if calls[0] == 1: return '{"data":"x"}' shutdown.set() return None proc.get_line.side_effect = get_line mock_parse.return_value = reading mock_calc.return_value = CostResult( delta=50.0, per_unit_cost=6.0, component_costs=[], total_incremental_cost=6.0, ) Pipeline(config, shutdown, app_state).run() mock_save.assert_called()