Files
HAMeter/hameter/pipeline.py
2026-03-06 12:25:27 -05:00

199 lines
7.7 KiB
Python

"""Main pipeline: ties subprocess management, parsing, and MQTT together."""
import json
import logging
import threading
from dataclasses import asdict
from typing import Optional
from hameter.config import HaMeterConfig, MeterConfig
from hameter.cost import calculate_incremental_cost
from hameter.cost_state import save_cost_state
from hameter.meter import MeterReading, parse_rtlamr_line
from hameter.mqtt_client import HaMeterMQTT
from hameter.state import CostState, PipelineStatus
from hameter.subprocess_manager import SubprocessManager
logger = logging.getLogger(__name__)
class Pipeline:
"""Orchestrates the full meter-reading pipeline."""
def __init__(
self,
config: HaMeterConfig,
shutdown_event: threading.Event,
app_state=None,
):
self._config = config
self._shutdown = shutdown_event
self._state = app_state
self._meters_by_id = {m.id: m for m in config.meters}
self._meter_ids = list(self._meters_by_id.keys())
self._protocols = list({m.protocol for m in config.meters})
self._proc_mgr = SubprocessManager(config.general, shutdown_event)
self._mqtt = HaMeterMQTT(config.mqtt, config.meters)
def run(self):
"""Block until shutdown or restart request."""
try:
try:
self._mqtt.connect()
except OSError as e:
if self._state:
self._state.set_status(
PipelineStatus.ERROR, f"MQTT connection failed: {e}"
)
return
if self._state:
self._state.set_status(PipelineStatus.STARTING, "Connecting to MQTT...")
if not self._start_with_retries():
if self._state:
self._state.set_status(
PipelineStatus.ERROR, "Failed to start subprocesses"
)
logger.error("Failed to start subprocesses after retries, exiting")
return
if self._state:
self._state.set_status(PipelineStatus.RUNNING)
self._main_loop()
finally:
self._shutdown_all()
def _start_with_retries(self, max_attempts: int = 5) -> bool:
"""Try to start subprocesses up to max_attempts times."""
for attempt in range(1, max_attempts + 1):
if self._shutdown.is_set():
return False
if self._should_stop():
return False
logger.info("Starting subprocesses (attempt %d/%d)", attempt, max_attempts)
if self._proc_mgr.start(self._meter_ids, self._protocols):
return True
if attempt < max_attempts:
logger.warning("Startup failed, retrying...")
if self._shutdown.wait(timeout=5):
return False
return False
def _main_loop(self):
"""Read lines from rtlamr, parse, and publish."""
logger.info("Pipeline running — listening for meter readings")
while not self._shutdown.is_set():
# Check for restart/discovery request from web UI.
if self._should_stop():
logger.info("Received request to stop pipeline")
break
# Health check.
if not self._proc_mgr.is_healthy():
logger.warning("Subprocess died, attempting restart")
if not self._proc_mgr.restart(self._meter_ids, self._protocols):
logger.error("Restart failed, exiting main loop")
break
continue
# Read next line (non-blocking, 1s timeout).
line = self._proc_mgr.get_line(timeout=1.0)
if line is None:
continue
try:
reading = parse_rtlamr_line(line, self._meters_by_id)
if reading:
self._mqtt.publish_reading(reading)
if self._state:
self._state.record_reading(reading)
meter_cfg = self._meters_by_id.get(reading.meter_id)
if meter_cfg and meter_cfg.cost_factors:
self._process_cost(reading, meter_cfg)
except json.JSONDecodeError:
logger.debug("Non-JSON line from rtlamr: %.200s", line)
except (KeyError, ValueError, TypeError) as e:
logger.warning("Error processing line: %s", e)
except Exception:
logger.exception("Unexpected error processing line")
def _process_cost(self, reading: MeterReading, meter_cfg: MeterConfig):
"""Calculate and record incremental cost for a reading."""
cost_state = self._state.get_cost_state(reading.meter_id)
if cost_state is None:
# First reading for this meter — initialize baseline, no cost yet.
new_state = CostState(
last_calibrated_reading=reading.calibrated_consumption,
last_updated=reading.timestamp,
billing_period_start=reading.timestamp,
)
self._state.update_cost_state(reading.meter_id, new_state)
return
if cost_state.last_calibrated_reading is None:
# After a billing period reset — this reading sets the baseline.
new_state = CostState(
cumulative_cost=cost_state.cumulative_cost,
last_calibrated_reading=reading.calibrated_consumption,
billing_period_start=cost_state.billing_period_start,
last_updated=reading.timestamp,
fixed_charges_applied=cost_state.fixed_charges_applied,
)
self._state.update_cost_state(reading.meter_id, new_state)
return
delta = reading.calibrated_consumption - cost_state.last_calibrated_reading
if delta <= 0:
# No new consumption (duplicate reading or meter rollover).
return
result = calculate_incremental_cost(delta, meter_cfg.cost_factors)
new_cumulative = round(
cost_state.cumulative_cost + result.total_incremental_cost, 4
)
new_state = CostState(
cumulative_cost=new_cumulative,
last_calibrated_reading=reading.calibrated_consumption,
billing_period_start=cost_state.billing_period_start,
last_updated=reading.timestamp,
fixed_charges_applied=cost_state.fixed_charges_applied,
)
self._state.update_cost_state(reading.meter_id, new_state)
self._mqtt.publish_cost(reading.meter_id, new_cumulative)
self._save_all_cost_states()
logger.debug(
"Cost update: meter=%d delta=%.4f incremental=$%.4f cumulative=$%.4f",
reading.meter_id, delta,
result.total_incremental_cost,
cost_state.cumulative_cost,
)
def _save_all_cost_states(self):
"""Persist cost state for all meters to disk."""
states = self._state.get_cost_states()
serialized = {str(mid): asdict(cs) for mid, cs in states.items()}
try:
save_cost_state(serialized)
except Exception:
logger.exception("Failed to persist cost state")
def _should_stop(self) -> bool:
"""Check if the web UI has requested a restart or discovery."""
if self._state is None:
return False
return (
self._state.restart_requested.is_set()
or self._state.discovery_requested.is_set()
)
def _shutdown_all(self):
"""Clean shutdown of subprocesses and MQTT."""
logger.info("Shutting down pipeline...")
self._proc_mgr.stop()
self._mqtt.disconnect()
logger.info("Pipeline shutdown complete")