Files
HAMeter/hameter/mqtt_client.py
2026-03-06 12:25:27 -05:00

253 lines
9.3 KiB
Python

"""MQTT client with Home Assistant auto-discovery support."""
import json
import logging
from typing import Optional
import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion
from hameter import __version__
from hameter.config import MeterConfig, MqttConfig
from hameter.meter import MeterReading
logger = logging.getLogger(__name__)
class HaMeterMQTT:
"""Manages MQTT connection, HA discovery, and meter state publishing."""
def __init__(self, config: MqttConfig, meters: list[MeterConfig]):
self._config = config
self._meters = meters
self._meters_by_id: dict[int, MeterConfig] = {m.id: m for m in meters}
self._client = mqtt.Client(
callback_api_version=CallbackAPIVersion.VERSION2,
client_id=config.client_id,
)
# Last Will: broker publishes "offline" if we disconnect unexpectedly.
self._client.will_set(
topic=f"{config.base_topic}/status",
payload="offline",
qos=1,
retain=True,
)
if config.user:
self._client.username_pw_set(config.user, config.password)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_message = self._on_message
def connect(self):
"""Connect to the MQTT broker and start the network loop.
Raises:
OSError: If the broker is unreachable.
"""
logger.info("Connecting to MQTT broker at %s:%d", self._config.host, self._config.port)
try:
self._client.connect(self._config.host, self._config.port, keepalive=60)
except OSError as e:
logger.error("Failed to connect to MQTT broker: %s", e)
raise
self._client.loop_start()
def disconnect(self):
"""Publish offline status and cleanly disconnect."""
try:
self._client.publish(
f"{self._config.base_topic}/status", "offline", qos=1, retain=True,
)
self._client.loop_stop()
self._client.disconnect()
except Exception as e:
logger.warning("Error during MQTT disconnect: %s", e)
def publish_online(self):
"""Publish online availability status."""
self._client.publish(
f"{self._config.base_topic}/status", "online", qos=1, retain=True,
)
def publish_reading(self, reading: MeterReading):
"""Publish a meter reading to the state topic."""
state_payload = {
"reading": reading.calibrated_consumption,
"raw_reading": reading.raw_consumption,
"timestamp": reading.timestamp,
"protocol": reading.protocol,
}
self._client.publish(
f"{self._config.base_topic}/{reading.meter_id}/state",
json.dumps(state_payload),
qos=1,
retain=True,
)
meter_cfg = self._meters_by_id.get(reading.meter_id)
unit = meter_cfg.unit_of_measurement if meter_cfg else ""
logger.info(
"Published: meter=%d raw=%d calibrated=%.4f %s",
reading.meter_id,
reading.raw_consumption,
reading.calibrated_consumption,
unit,
)
def publish_cost(self, meter_id: int, cumulative_cost: float):
"""Publish cumulative cost for a meter."""
payload = {"cost": round(cumulative_cost, 2)}
self._client.publish(
f"{self._config.base_topic}/{meter_id}/cost",
json.dumps(payload),
qos=1,
retain=True,
)
logger.debug("Published cost: meter=%d cost=$%.2f", meter_id, cumulative_cost)
# ------------------------------------------------------------------ #
# HA Auto-Discovery
# ------------------------------------------------------------------ #
def _publish_discovery(self):
"""Publish HA MQTT auto-discovery config for each meter."""
if not self._config.ha_autodiscovery:
return
for meter in self._meters:
device_id = f"hameter_{meter.id}"
base = self._config.base_topic
disco = self._config.ha_autodiscovery_topic
device_block = {
"identifiers": [device_id],
"name": meter.name,
"manufacturer": "HAMeter",
"model": f"ERT {meter.protocol.upper()}",
"sw_version": __version__,
"serial_number": str(meter.id),
}
availability_block = {
"availability_topic": f"{base}/status",
"payload_available": "online",
"payload_not_available": "offline",
}
# --- Calibrated reading sensor ---
reading_config = {
"name": f"{meter.name} Reading",
"unique_id": f"{device_id}_reading",
"state_topic": f"{base}/{meter.id}/state",
"value_template": "{{ value_json.reading }}",
"unit_of_measurement": meter.unit_of_measurement,
"state_class": meter.state_class,
"icon": meter.icon,
"device": device_block,
**availability_block,
}
if meter.device_class:
reading_config["device_class"] = meter.device_class
self._client.publish(
f"{disco}/sensor/{device_id}/reading/config",
json.dumps(reading_config),
qos=1,
retain=True,
)
# --- Last Seen sensor ---
lastseen_config = {
"name": f"{meter.name} Last Seen",
"unique_id": f"{device_id}_last_seen",
"state_topic": f"{base}/{meter.id}/state",
"value_template": "{{ value_json.timestamp }}",
"device_class": "timestamp",
"icon": "mdi:clock-outline",
"device": {"identifiers": [device_id], "name": meter.name},
**availability_block,
}
self._client.publish(
f"{disco}/sensor/{device_id}/last_seen/config",
json.dumps(lastseen_config),
qos=1,
retain=True,
)
# --- Raw reading sensor (diagnostic) ---
raw_config = {
"name": f"{meter.name} Raw Reading",
"unique_id": f"{device_id}_raw",
"state_topic": f"{base}/{meter.id}/state",
"value_template": "{{ value_json.raw_reading }}",
"icon": "mdi:counter",
"entity_category": "diagnostic",
"device": {"identifiers": [device_id], "name": meter.name},
**availability_block,
}
self._client.publish(
f"{disco}/sensor/{device_id}/raw/config",
json.dumps(raw_config),
qos=1,
retain=True,
)
# --- Cost sensor (only for meters with cost_factors) ---
if meter.cost_factors:
cost_config = {
"name": f"{meter.name} Cost",
"unique_id": f"{device_id}_cost",
"state_topic": f"{base}/{meter.id}/cost",
"value_template": "{{ value_json.cost }}",
"unit_of_measurement": "$",
"device_class": "monetary",
"state_class": "total",
"icon": "mdi:currency-usd",
"device": {"identifiers": [device_id], "name": meter.name},
**availability_block,
}
self._client.publish(
f"{disco}/sensor/{device_id}/cost/config",
json.dumps(cost_config),
qos=1,
retain=True,
)
logger.info("Published HA discovery for meter %d (%s)", meter.id, meter.name)
# ------------------------------------------------------------------ #
# Callbacks
# ------------------------------------------------------------------ #
def _on_connect(self, client, userdata, connect_flags, reason_code, properties):
if reason_code == 0:
logger.info("Connected to MQTT broker")
self.publish_online()
# Subscribe to HA status so we re-publish discovery on HA restart.
client.subscribe(
f"{self._config.ha_autodiscovery_topic}/status", qos=1,
)
self._publish_discovery()
else:
logger.error("MQTT connection failed: %s", reason_code)
def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties):
if reason_code == 0:
logger.info("Disconnected from MQTT broker (clean)")
else:
logger.warning("Lost MQTT connection (rc=%s), will auto-reconnect", reason_code)
def _on_message(self, client, userdata, message):
# Re-publish discovery when HA comes online.
try:
payload = message.payload.decode()
except (UnicodeDecodeError, AttributeError):
return
if message.topic.endswith("/status") and payload == "online":
logger.info("Home Assistant came online, re-publishing discovery")
self._publish_discovery()