"""Main pipeline: ties subprocess management, parsing, and MQTT together.""" import json import logging import threading from dataclasses import asdict from typing import Optional from hameter.config import HaMeterConfig, MeterConfig from hameter.cost import calculate_incremental_cost from hameter.cost_state import save_cost_state from hameter.meter import MeterReading, parse_rtlamr_line from hameter.mqtt_client import HaMeterMQTT from hameter.state import CostState, PipelineStatus from hameter.subprocess_manager import SubprocessManager logger = logging.getLogger(__name__) class Pipeline: """Orchestrates the full meter-reading pipeline.""" def __init__( self, config: HaMeterConfig, shutdown_event: threading.Event, app_state=None, ): self._config = config self._shutdown = shutdown_event self._state = app_state self._meters_by_id = {m.id: m for m in config.meters} self._meter_ids = list(self._meters_by_id.keys()) self._protocols = list({m.protocol for m in config.meters}) self._proc_mgr = SubprocessManager(config.general, shutdown_event) self._mqtt = HaMeterMQTT(config.mqtt, config.meters) def run(self): """Block until shutdown or restart request.""" try: try: self._mqtt.connect() except OSError as e: if self._state: self._state.set_status( PipelineStatus.ERROR, f"MQTT connection failed: {e}" ) return if self._state: self._state.set_status(PipelineStatus.STARTING, "Connecting to MQTT...") if not self._start_with_retries(): if self._state: self._state.set_status( PipelineStatus.ERROR, "Failed to start subprocesses" ) logger.error("Failed to start subprocesses after retries, exiting") return if self._state: self._state.set_status(PipelineStatus.RUNNING) self._main_loop() finally: self._shutdown_all() def _start_with_retries(self, max_attempts: int = 5) -> bool: """Try to start subprocesses up to max_attempts times.""" for attempt in range(1, max_attempts + 1): if self._shutdown.is_set(): return False if self._should_stop(): return False logger.info("Starting subprocesses (attempt %d/%d)", attempt, max_attempts) if self._proc_mgr.start(self._meter_ids, self._protocols): return True if attempt < max_attempts: logger.warning("Startup failed, retrying...") if self._shutdown.wait(timeout=5): return False return False def _main_loop(self): """Read lines from rtlamr, parse, and publish.""" logger.info("Pipeline running — listening for meter readings") while not self._shutdown.is_set(): # Check for restart/discovery request from web UI. if self._should_stop(): logger.info("Received request to stop pipeline") break # Health check. if not self._proc_mgr.is_healthy(): logger.warning("Subprocess died, attempting restart") if not self._proc_mgr.restart(self._meter_ids, self._protocols): logger.error("Restart failed, exiting main loop") break continue # Read next line (non-blocking, 1s timeout). line = self._proc_mgr.get_line(timeout=1.0) if line is None: continue try: reading = parse_rtlamr_line(line, self._meters_by_id) if reading: self._mqtt.publish_reading(reading) if self._state: self._state.record_reading(reading) meter_cfg = self._meters_by_id.get(reading.meter_id) if meter_cfg and meter_cfg.cost_factors: self._process_cost(reading, meter_cfg) except json.JSONDecodeError: logger.debug("Non-JSON line from rtlamr: %.200s", line) except (KeyError, ValueError, TypeError) as e: logger.warning("Error processing line: %s", e) except Exception: logger.exception("Unexpected error processing line") def _process_cost(self, reading: MeterReading, meter_cfg: MeterConfig): """Calculate and record incremental cost for a reading.""" cost_state = self._state.get_cost_state(reading.meter_id) if cost_state is None: # First reading for this meter — initialize baseline, no cost yet. new_state = CostState( last_calibrated_reading=reading.calibrated_consumption, last_updated=reading.timestamp, billing_period_start=reading.timestamp, ) self._state.update_cost_state(reading.meter_id, new_state) return if cost_state.last_calibrated_reading is None: # After a billing period reset — this reading sets the baseline. new_state = CostState( cumulative_cost=cost_state.cumulative_cost, last_calibrated_reading=reading.calibrated_consumption, billing_period_start=cost_state.billing_period_start, last_updated=reading.timestamp, fixed_charges_applied=cost_state.fixed_charges_applied, ) self._state.update_cost_state(reading.meter_id, new_state) return delta = reading.calibrated_consumption - cost_state.last_calibrated_reading if delta <= 0: # No new consumption (duplicate reading or meter rollover). return result = calculate_incremental_cost(delta, meter_cfg.cost_factors) new_cumulative = round( cost_state.cumulative_cost + result.total_incremental_cost, 4 ) new_state = CostState( cumulative_cost=new_cumulative, last_calibrated_reading=reading.calibrated_consumption, billing_period_start=cost_state.billing_period_start, last_updated=reading.timestamp, fixed_charges_applied=cost_state.fixed_charges_applied, ) self._state.update_cost_state(reading.meter_id, new_state) self._mqtt.publish_cost(reading.meter_id, new_cumulative) self._save_all_cost_states() logger.debug( "Cost update: meter=%d delta=%.4f incremental=$%.4f cumulative=$%.4f", reading.meter_id, delta, result.total_incremental_cost, cost_state.cumulative_cost, ) def _save_all_cost_states(self): """Persist cost state for all meters to disk.""" states = self._state.get_cost_states() serialized = {str(mid): asdict(cs) for mid, cs in states.items()} try: save_cost_state(serialized) except Exception: logger.exception("Failed to persist cost state") def _should_stop(self) -> bool: """Check if the web UI has requested a restart or discovery.""" if self._state is None: return False return ( self._state.restart_requested.is_set() or self._state.discovery_requested.is_set() ) def _shutdown_all(self): """Clean shutdown of subprocesses and MQTT.""" logger.info("Shutting down pipeline...") self._proc_mgr.stop() self._mqtt.disconnect() logger.info("Pipeline shutdown complete")