"""Meter data model and rtlamr output parsing.""" import json import logging from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional from hameter.config import MeterConfig logger = logging.getLogger(__name__) # Map rtlamr message Type to the field containing the meter ID. PROTOCOL_ID_FIELDS: dict[str, str] = { "SCM": "ID", "SCM+": "EndpointID", "IDM": "ERTSerialNumber", "NetIDM": "ERTSerialNumber", "R900": "ID", "R900BCD": "ID", } # Map rtlamr message Type to the field containing the consumption value. PROTOCOL_CONSUMPTION_FIELDS: dict[str, str] = { "SCM": "Consumption", "SCM+": "Consumption", "IDM": "LastConsumptionCount", "NetIDM": "LastConsumptionCount", "R900": "Consumption", "R900BCD": "Consumption", } @dataclass class MeterReading: meter_id: int protocol: str raw_consumption: int calibrated_consumption: float timestamp: str raw_message: dict def parse_rtlamr_line( line: str, meters: dict[int, MeterConfig], ) -> Optional[MeterReading]: """Parse a single JSON line from rtlamr stdout. Args: line: Raw JSON string from rtlamr. meters: Dict mapping meter ID to config. If empty, accept all meters (discovery mode). Returns: MeterReading if the line matches a configured meter (or any meter in discovery mode), otherwise None. """ data = json.loads(line) msg_type = data.get("Type", "") message = data.get("Message", {}) id_field = PROTOCOL_ID_FIELDS.get(msg_type) consumption_field = PROTOCOL_CONSUMPTION_FIELDS.get(msg_type) if not id_field or not consumption_field: return None meter_id = int(message.get(id_field, 0)) if meter_id == 0: return None raw_consumption = int(message.get(consumption_field, 0)) # Filter: if meters dict is populated, only accept configured meters. if meters and meter_id not in meters: return None meter_cfg = meters.get(meter_id) multiplier = meter_cfg.multiplier if meter_cfg else 1.0 calibrated = raw_consumption * multiplier timestamp = data.get("Time") or datetime.now(timezone.utc).isoformat() return MeterReading( meter_id=meter_id, protocol=msg_type, raw_consumption=raw_consumption, calibrated_consumption=round(calibrated, 4), timestamp=timestamp, raw_message=message, )