Files
HAMeter/hameter/meter.py
2026-03-06 12:25:27 -05:00

93 lines
2.4 KiB
Python

"""Meter data model and rtlamr output parsing."""
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional
from hameter.config import MeterConfig
logger = logging.getLogger(__name__)
# Map rtlamr message Type to the field containing the meter ID.
PROTOCOL_ID_FIELDS: dict[str, str] = {
"SCM": "ID",
"SCM+": "EndpointID",
"IDM": "ERTSerialNumber",
"NetIDM": "ERTSerialNumber",
"R900": "ID",
"R900BCD": "ID",
}
# Map rtlamr message Type to the field containing the consumption value.
PROTOCOL_CONSUMPTION_FIELDS: dict[str, str] = {
"SCM": "Consumption",
"SCM+": "Consumption",
"IDM": "LastConsumptionCount",
"NetIDM": "LastConsumptionCount",
"R900": "Consumption",
"R900BCD": "Consumption",
}
@dataclass
class MeterReading:
meter_id: int
protocol: str
raw_consumption: int
calibrated_consumption: float
timestamp: str
raw_message: dict
def parse_rtlamr_line(
line: str,
meters: dict[int, MeterConfig],
) -> Optional[MeterReading]:
"""Parse a single JSON line from rtlamr stdout.
Args:
line: Raw JSON string from rtlamr.
meters: Dict mapping meter ID to config. If empty, accept all meters
(discovery mode).
Returns:
MeterReading if the line matches a configured meter (or any meter in
discovery mode), otherwise None.
"""
data = json.loads(line)
msg_type = data.get("Type", "")
message = data.get("Message", {})
id_field = PROTOCOL_ID_FIELDS.get(msg_type)
consumption_field = PROTOCOL_CONSUMPTION_FIELDS.get(msg_type)
if not id_field or not consumption_field:
return None
meter_id = int(message.get(id_field, 0))
if meter_id == 0:
return None
raw_consumption = int(message.get(consumption_field, 0))
# Filter: if meters dict is populated, only accept configured meters.
if meters and meter_id not in meters:
return None
meter_cfg = meters.get(meter_id)
multiplier = meter_cfg.multiplier if meter_cfg else 1.0
calibrated = raw_consumption * multiplier
timestamp = data.get("Time") or datetime.now(timezone.utc).isoformat()
return MeterReading(
meter_id=meter_id,
protocol=msg_type,
raw_consumption=raw_consumption,
calibrated_consumption=round(calibrated, 4),
timestamp=timestamp,
raw_message=message,
)