Files
HAMeter/tests/test_pipeline.py
2026-03-06 12:25:27 -05:00

420 lines
12 KiB
Python

"""Unit tests for the Pipeline class."""
import threading
from unittest.mock import MagicMock, patch
import pytest
from hameter.pipeline import Pipeline
from hameter.meter import MeterReading
from hameter.config import (
HaMeterConfig,
GeneralConfig,
MqttConfig,
MeterConfig,
RateComponent,
)
from hameter.cost import CostResult
from hameter.state import AppState, CostState
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_config(meters=None):
if meters is None:
meters = [
MeterConfig(
id=12345,
protocol="scm",
name="Electric Meter",
unit_of_measurement="kWh",
cost_factors=[
RateComponent(name="energy", rate=0.12, type="per_unit"),
],
),
]
return HaMeterConfig(
general=GeneralConfig(),
mqtt=MqttConfig(host="localhost"),
meters=meters,
)
def _make_reading(meter_id=12345, raw=100000, calibrated=1000.0):
return MeterReading(
meter_id=meter_id,
protocol="SCM",
raw_consumption=raw,
calibrated_consumption=calibrated,
timestamp="2026-03-05T12:00:00Z",
raw_message={"ID": meter_id, "Consumption": raw},
)
_P = "hameter.pipeline"
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@patch(f"{_P}.HaMeterMQTT")
@patch(f"{_P}.SubprocessManager")
class TestPipeline:
def test_pipeline_starts_and_shuts_down(self, MockSubMgr, MockMQTT):
"""Pipeline connects MQTT, starts subprocesses, and shuts down."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
proc.get_line.side_effect = lambda timeout=1.0: (shutdown.set(), None)[1]
Pipeline(config, shutdown, app_state).run()
mqtt.connect.assert_called_once()
proc.start.assert_called_once()
proc.stop.assert_called_once()
mqtt.disconnect.assert_called_once()
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_processes_reading(self, mock_parse, MockSubMgr, MockMQTT):
"""A valid reading is parsed and published via MQTT."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
reading = _make_reading()
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] == 1:
return '{"Type":"SCM","Message":{"ID":12345}}'
shutdown.set()
return None
proc.get_line.side_effect = get_line
mock_parse.return_value = reading
Pipeline(config, shutdown, app_state).run()
mock_parse.assert_called_once()
mqtt.publish_reading.assert_called_once_with(reading)
@patch(f"{_P}.save_cost_state")
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_cost_first_reading_sets_baseline(
self, mock_parse, mock_save, MockSubMgr, MockMQTT
):
"""First reading sets baseline, no cost published."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
reading = _make_reading(calibrated=1000.0)
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] == 1:
return '{"data":"x"}'
shutdown.set()
return None
proc.get_line.side_effect = get_line
mock_parse.return_value = reading
Pipeline(config, shutdown, app_state).run()
mqtt.publish_cost.assert_not_called()
cs = app_state.get_cost_state(12345)
assert cs is not None
assert cs.last_calibrated_reading == 1000.0
@patch(f"{_P}.calculate_incremental_cost")
@patch(f"{_P}.save_cost_state")
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_cost_with_valid_delta(
self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT
):
"""With a baseline, a positive delta triggers cost calculation."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
# Pre-set cost state with a baseline
app_state.update_cost_state(12345, CostState(
cumulative_cost=1.20,
last_calibrated_reading=1000.0,
billing_period_start="2026-03-01T00:00:00Z",
))
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
reading = _make_reading(calibrated=1010.0) # delta = 10.0
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] == 1:
return '{"data":"x"}'
shutdown.set()
return None
proc.get_line.side_effect = get_line
mock_parse.return_value = reading
mock_calc.return_value = CostResult(
delta=10.0,
per_unit_cost=1.20,
component_costs=[],
total_incremental_cost=1.20,
)
Pipeline(config, shutdown, app_state).run()
mock_calc.assert_called_once()
mqtt.publish_cost.assert_called_once()
cs = app_state.get_cost_state(12345)
assert cs.cumulative_cost == pytest.approx(2.40)
mock_save.assert_called()
@patch(f"{_P}.save_cost_state")
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_cost_skips_no_cost_factors(
self, mock_parse, mock_save, MockSubMgr, MockMQTT
):
"""Meter without cost_factors: no cost processing."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config(meters=[
MeterConfig(id=99999, protocol="scm", name="Water",
unit_of_measurement="gal", cost_factors=[]),
])
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
reading = _make_reading(meter_id=99999, calibrated=500.0)
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] == 1:
return '{"data":"x"}'
shutdown.set()
return None
proc.get_line.side_effect = get_line
mock_parse.return_value = reading
Pipeline(config, shutdown, app_state).run()
mqtt.publish_reading.assert_called_once()
mqtt.publish_cost.assert_not_called()
mock_save.assert_not_called()
@patch(f"{_P}.calculate_incremental_cost")
@patch(f"{_P}.save_cost_state")
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_cost_skips_negative_delta(
self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT
):
"""Delta <= 0 skips cost calculation."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
app_state.update_cost_state(12345, CostState(
cumulative_cost=5.0,
last_calibrated_reading=1000.0,
))
reading = _make_reading(calibrated=1000.0) # delta = 0
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] == 1:
return '{"data":"x"}'
shutdown.set()
return None
proc.get_line.side_effect = get_line
mock_parse.return_value = reading
Pipeline(config, shutdown, app_state).run()
mock_calc.assert_not_called()
mqtt.publish_cost.assert_not_called()
@patch(f"{_P}.save_cost_state")
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_cost_after_billing_reset(
self, mock_parse, mock_save, MockSubMgr, MockMQTT
):
"""After billing reset (last_calibrated_reading=None), sets baseline only."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
app_state.update_cost_state(12345, CostState(
cumulative_cost=0.0,
last_calibrated_reading=None,
billing_period_start="2026-03-01T00:00:00Z",
))
reading = _make_reading(calibrated=1050.0)
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] == 1:
return '{"data":"x"}'
shutdown.set()
return None
proc.get_line.side_effect = get_line
mock_parse.return_value = reading
Pipeline(config, shutdown, app_state).run()
mqtt.publish_cost.assert_not_called()
cs = app_state.get_cost_state(12345)
assert cs.last_calibrated_reading == 1050.0
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_handles_restart_request(
self, mock_parse, MockSubMgr, MockMQTT
):
"""Restart request causes main loop to exit."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] == 1:
app_state.restart_requested.set()
return None
proc.get_line.side_effect = get_line
Pipeline(config, shutdown, app_state).run()
mock_parse.assert_not_called()
proc.stop.assert_called_once()
mqtt.disconnect.assert_called_once()
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_handles_unhealthy_subprocess(
self, mock_parse, MockSubMgr, MockMQTT
):
"""Unhealthy subprocess triggers a restart attempt."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.restart.return_value = True
health = iter([False, True, True])
proc.is_healthy.side_effect = lambda: next(health, True)
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] >= 2:
shutdown.set()
return None
proc.get_line.side_effect = get_line
Pipeline(config, shutdown, app_state).run()
proc.restart.assert_called()
@patch(f"{_P}.calculate_incremental_cost")
@patch(f"{_P}.save_cost_state")
@patch(f"{_P}.parse_rtlamr_line")
def test_pipeline_saves_cost_state(
self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT
):
"""save_cost_state is called after a cost update."""
shutdown = threading.Event()
app_state = AppState()
config = _make_config()
app_state.update_cost_state(12345, CostState(
cumulative_cost=0.0,
last_calibrated_reading=900.0,
))
reading = _make_reading(calibrated=950.0)
proc = MockSubMgr.return_value
mqtt = MockMQTT.return_value
proc.start.return_value = True
proc.is_healthy.return_value = True
calls = [0]
def get_line(timeout=1.0):
calls[0] += 1
if calls[0] == 1:
return '{"data":"x"}'
shutdown.set()
return None
proc.get_line.side_effect = get_line
mock_parse.return_value = reading
mock_calc.return_value = CostResult(
delta=50.0, per_unit_cost=6.0,
component_costs=[], total_incremental_cost=6.0,
)
Pipeline(config, shutdown, app_state).run()
mock_save.assert_called()