420 lines
12 KiB
Python
420 lines
12 KiB
Python
"""Unit tests for the Pipeline class."""
|
|
|
|
import threading
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from hameter.pipeline import Pipeline
|
|
from hameter.meter import MeterReading
|
|
from hameter.config import (
|
|
HaMeterConfig,
|
|
GeneralConfig,
|
|
MqttConfig,
|
|
MeterConfig,
|
|
RateComponent,
|
|
)
|
|
from hameter.cost import CostResult
|
|
from hameter.state import AppState, CostState
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_config(meters=None):
|
|
if meters is None:
|
|
meters = [
|
|
MeterConfig(
|
|
id=12345,
|
|
protocol="scm",
|
|
name="Electric Meter",
|
|
unit_of_measurement="kWh",
|
|
cost_factors=[
|
|
RateComponent(name="energy", rate=0.12, type="per_unit"),
|
|
],
|
|
),
|
|
]
|
|
return HaMeterConfig(
|
|
general=GeneralConfig(),
|
|
mqtt=MqttConfig(host="localhost"),
|
|
meters=meters,
|
|
)
|
|
|
|
|
|
def _make_reading(meter_id=12345, raw=100000, calibrated=1000.0):
|
|
return MeterReading(
|
|
meter_id=meter_id,
|
|
protocol="SCM",
|
|
raw_consumption=raw,
|
|
calibrated_consumption=calibrated,
|
|
timestamp="2026-03-05T12:00:00Z",
|
|
raw_message={"ID": meter_id, "Consumption": raw},
|
|
)
|
|
|
|
|
|
_P = "hameter.pipeline"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@patch(f"{_P}.HaMeterMQTT")
|
|
@patch(f"{_P}.SubprocessManager")
|
|
class TestPipeline:
|
|
|
|
def test_pipeline_starts_and_shuts_down(self, MockSubMgr, MockMQTT):
|
|
"""Pipeline connects MQTT, starts subprocesses, and shuts down."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
proc.get_line.side_effect = lambda timeout=1.0: (shutdown.set(), None)[1]
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mqtt.connect.assert_called_once()
|
|
proc.start.assert_called_once()
|
|
proc.stop.assert_called_once()
|
|
mqtt.disconnect.assert_called_once()
|
|
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_processes_reading(self, mock_parse, MockSubMgr, MockMQTT):
|
|
"""A valid reading is parsed and published via MQTT."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
|
|
reading = _make_reading()
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] == 1:
|
|
return '{"Type":"SCM","Message":{"ID":12345}}'
|
|
shutdown.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
mock_parse.return_value = reading
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mock_parse.assert_called_once()
|
|
mqtt.publish_reading.assert_called_once_with(reading)
|
|
|
|
@patch(f"{_P}.save_cost_state")
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_cost_first_reading_sets_baseline(
|
|
self, mock_parse, mock_save, MockSubMgr, MockMQTT
|
|
):
|
|
"""First reading sets baseline, no cost published."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
|
|
reading = _make_reading(calibrated=1000.0)
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] == 1:
|
|
return '{"data":"x"}'
|
|
shutdown.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
mock_parse.return_value = reading
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mqtt.publish_cost.assert_not_called()
|
|
cs = app_state.get_cost_state(12345)
|
|
assert cs is not None
|
|
assert cs.last_calibrated_reading == 1000.0
|
|
|
|
@patch(f"{_P}.calculate_incremental_cost")
|
|
@patch(f"{_P}.save_cost_state")
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_cost_with_valid_delta(
|
|
self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT
|
|
):
|
|
"""With a baseline, a positive delta triggers cost calculation."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
# Pre-set cost state with a baseline
|
|
app_state.update_cost_state(12345, CostState(
|
|
cumulative_cost=1.20,
|
|
last_calibrated_reading=1000.0,
|
|
billing_period_start="2026-03-01T00:00:00Z",
|
|
))
|
|
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
|
|
reading = _make_reading(calibrated=1010.0) # delta = 10.0
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] == 1:
|
|
return '{"data":"x"}'
|
|
shutdown.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
mock_parse.return_value = reading
|
|
|
|
mock_calc.return_value = CostResult(
|
|
delta=10.0,
|
|
per_unit_cost=1.20,
|
|
component_costs=[],
|
|
total_incremental_cost=1.20,
|
|
)
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mock_calc.assert_called_once()
|
|
mqtt.publish_cost.assert_called_once()
|
|
cs = app_state.get_cost_state(12345)
|
|
assert cs.cumulative_cost == pytest.approx(2.40)
|
|
mock_save.assert_called()
|
|
|
|
@patch(f"{_P}.save_cost_state")
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_cost_skips_no_cost_factors(
|
|
self, mock_parse, mock_save, MockSubMgr, MockMQTT
|
|
):
|
|
"""Meter without cost_factors: no cost processing."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config(meters=[
|
|
MeterConfig(id=99999, protocol="scm", name="Water",
|
|
unit_of_measurement="gal", cost_factors=[]),
|
|
])
|
|
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
|
|
reading = _make_reading(meter_id=99999, calibrated=500.0)
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] == 1:
|
|
return '{"data":"x"}'
|
|
shutdown.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
mock_parse.return_value = reading
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mqtt.publish_reading.assert_called_once()
|
|
mqtt.publish_cost.assert_not_called()
|
|
mock_save.assert_not_called()
|
|
|
|
@patch(f"{_P}.calculate_incremental_cost")
|
|
@patch(f"{_P}.save_cost_state")
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_cost_skips_negative_delta(
|
|
self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT
|
|
):
|
|
"""Delta <= 0 skips cost calculation."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
app_state.update_cost_state(12345, CostState(
|
|
cumulative_cost=5.0,
|
|
last_calibrated_reading=1000.0,
|
|
))
|
|
|
|
reading = _make_reading(calibrated=1000.0) # delta = 0
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] == 1:
|
|
return '{"data":"x"}'
|
|
shutdown.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
mock_parse.return_value = reading
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mock_calc.assert_not_called()
|
|
mqtt.publish_cost.assert_not_called()
|
|
|
|
@patch(f"{_P}.save_cost_state")
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_cost_after_billing_reset(
|
|
self, mock_parse, mock_save, MockSubMgr, MockMQTT
|
|
):
|
|
"""After billing reset (last_calibrated_reading=None), sets baseline only."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
app_state.update_cost_state(12345, CostState(
|
|
cumulative_cost=0.0,
|
|
last_calibrated_reading=None,
|
|
billing_period_start="2026-03-01T00:00:00Z",
|
|
))
|
|
|
|
reading = _make_reading(calibrated=1050.0)
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] == 1:
|
|
return '{"data":"x"}'
|
|
shutdown.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
mock_parse.return_value = reading
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mqtt.publish_cost.assert_not_called()
|
|
cs = app_state.get_cost_state(12345)
|
|
assert cs.last_calibrated_reading == 1050.0
|
|
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_handles_restart_request(
|
|
self, mock_parse, MockSubMgr, MockMQTT
|
|
):
|
|
"""Restart request causes main loop to exit."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] == 1:
|
|
app_state.restart_requested.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mock_parse.assert_not_called()
|
|
proc.stop.assert_called_once()
|
|
mqtt.disconnect.assert_called_once()
|
|
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_handles_unhealthy_subprocess(
|
|
self, mock_parse, MockSubMgr, MockMQTT
|
|
):
|
|
"""Unhealthy subprocess triggers a restart attempt."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.restart.return_value = True
|
|
|
|
health = iter([False, True, True])
|
|
proc.is_healthy.side_effect = lambda: next(health, True)
|
|
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] >= 2:
|
|
shutdown.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
proc.restart.assert_called()
|
|
|
|
@patch(f"{_P}.calculate_incremental_cost")
|
|
@patch(f"{_P}.save_cost_state")
|
|
@patch(f"{_P}.parse_rtlamr_line")
|
|
def test_pipeline_saves_cost_state(
|
|
self, mock_parse, mock_save, mock_calc, MockSubMgr, MockMQTT
|
|
):
|
|
"""save_cost_state is called after a cost update."""
|
|
shutdown = threading.Event()
|
|
app_state = AppState()
|
|
config = _make_config()
|
|
|
|
app_state.update_cost_state(12345, CostState(
|
|
cumulative_cost=0.0,
|
|
last_calibrated_reading=900.0,
|
|
))
|
|
|
|
reading = _make_reading(calibrated=950.0)
|
|
proc = MockSubMgr.return_value
|
|
mqtt = MockMQTT.return_value
|
|
proc.start.return_value = True
|
|
proc.is_healthy.return_value = True
|
|
|
|
calls = [0]
|
|
|
|
def get_line(timeout=1.0):
|
|
calls[0] += 1
|
|
if calls[0] == 1:
|
|
return '{"data":"x"}'
|
|
shutdown.set()
|
|
return None
|
|
|
|
proc.get_line.side_effect = get_line
|
|
mock_parse.return_value = reading
|
|
mock_calc.return_value = CostResult(
|
|
delta=50.0, per_unit_cost=6.0,
|
|
component_costs=[], total_incremental_cost=6.0,
|
|
)
|
|
|
|
Pipeline(config, shutdown, app_state).run()
|
|
|
|
mock_save.assert_called()
|