"""MQTT client with Home Assistant auto-discovery support.""" import json import logging from typing import Optional import paho.mqtt.client as mqtt from paho.mqtt.enums import CallbackAPIVersion from hameter import __version__ from hameter.config import MeterConfig, MqttConfig from hameter.meter import MeterReading logger = logging.getLogger(__name__) class HaMeterMQTT: """Manages MQTT connection, HA discovery, and meter state publishing.""" def __init__(self, config: MqttConfig, meters: list[MeterConfig]): self._config = config self._meters = meters self._meters_by_id: dict[int, MeterConfig] = {m.id: m for m in meters} self._client = mqtt.Client( callback_api_version=CallbackAPIVersion.VERSION2, client_id=config.client_id, ) # Last Will: broker publishes "offline" if we disconnect unexpectedly. self._client.will_set( topic=f"{config.base_topic}/status", payload="offline", qos=1, retain=True, ) if config.user: self._client.username_pw_set(config.user, config.password) self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect self._client.on_message = self._on_message def connect(self): """Connect to the MQTT broker and start the network loop. Raises: OSError: If the broker is unreachable. """ logger.info("Connecting to MQTT broker at %s:%d", self._config.host, self._config.port) try: self._client.connect(self._config.host, self._config.port, keepalive=60) except OSError as e: logger.error("Failed to connect to MQTT broker: %s", e) raise self._client.loop_start() def disconnect(self): """Publish offline status and cleanly disconnect.""" try: self._client.publish( f"{self._config.base_topic}/status", "offline", qos=1, retain=True, ) self._client.loop_stop() self._client.disconnect() except Exception as e: logger.warning("Error during MQTT disconnect: %s", e) def publish_online(self): """Publish online availability status.""" self._client.publish( f"{self._config.base_topic}/status", "online", qos=1, retain=True, ) def publish_reading(self, reading: MeterReading): """Publish a meter reading to the state topic.""" state_payload = { "reading": reading.calibrated_consumption, "raw_reading": reading.raw_consumption, "timestamp": reading.timestamp, "protocol": reading.protocol, } self._client.publish( f"{self._config.base_topic}/{reading.meter_id}/state", json.dumps(state_payload), qos=1, retain=True, ) meter_cfg = self._meters_by_id.get(reading.meter_id) unit = meter_cfg.unit_of_measurement if meter_cfg else "" logger.info( "Published: meter=%d raw=%d calibrated=%.4f %s", reading.meter_id, reading.raw_consumption, reading.calibrated_consumption, unit, ) def publish_cost(self, meter_id: int, cumulative_cost: float): """Publish cumulative cost for a meter.""" payload = {"cost": round(cumulative_cost, 2)} self._client.publish( f"{self._config.base_topic}/{meter_id}/cost", json.dumps(payload), qos=1, retain=True, ) logger.debug("Published cost: meter=%d cost=$%.2f", meter_id, cumulative_cost) # ------------------------------------------------------------------ # # HA Auto-Discovery # ------------------------------------------------------------------ # def _publish_discovery(self): """Publish HA MQTT auto-discovery config for each meter.""" if not self._config.ha_autodiscovery: return for meter in self._meters: device_id = f"hameter_{meter.id}" base = self._config.base_topic disco = self._config.ha_autodiscovery_topic device_block = { "identifiers": [device_id], "name": meter.name, "manufacturer": "HAMeter", "model": f"ERT {meter.protocol.upper()}", "sw_version": __version__, "serial_number": str(meter.id), } availability_block = { "availability_topic": f"{base}/status", "payload_available": "online", "payload_not_available": "offline", } # --- Calibrated reading sensor --- reading_config = { "name": f"{meter.name} Reading", "unique_id": f"{device_id}_reading", "state_topic": f"{base}/{meter.id}/state", "value_template": "{{ value_json.reading }}", "unit_of_measurement": meter.unit_of_measurement, "state_class": meter.state_class, "icon": meter.icon, "device": device_block, **availability_block, } if meter.device_class: reading_config["device_class"] = meter.device_class self._client.publish( f"{disco}/sensor/{device_id}/reading/config", json.dumps(reading_config), qos=1, retain=True, ) # --- Last Seen sensor --- lastseen_config = { "name": f"{meter.name} Last Seen", "unique_id": f"{device_id}_last_seen", "state_topic": f"{base}/{meter.id}/state", "value_template": "{{ value_json.timestamp }}", "device_class": "timestamp", "icon": "mdi:clock-outline", "device": {"identifiers": [device_id], "name": meter.name}, **availability_block, } self._client.publish( f"{disco}/sensor/{device_id}/last_seen/config", json.dumps(lastseen_config), qos=1, retain=True, ) # --- Raw reading sensor (diagnostic) --- raw_config = { "name": f"{meter.name} Raw Reading", "unique_id": f"{device_id}_raw", "state_topic": f"{base}/{meter.id}/state", "value_template": "{{ value_json.raw_reading }}", "icon": "mdi:counter", "entity_category": "diagnostic", "device": {"identifiers": [device_id], "name": meter.name}, **availability_block, } self._client.publish( f"{disco}/sensor/{device_id}/raw/config", json.dumps(raw_config), qos=1, retain=True, ) # --- Cost sensor (only for meters with cost_factors) --- if meter.cost_factors: cost_config = { "name": f"{meter.name} Cost", "unique_id": f"{device_id}_cost", "state_topic": f"{base}/{meter.id}/cost", "value_template": "{{ value_json.cost }}", "unit_of_measurement": "$", "device_class": "monetary", "state_class": "total", "icon": "mdi:currency-usd", "device": {"identifiers": [device_id], "name": meter.name}, **availability_block, } self._client.publish( f"{disco}/sensor/{device_id}/cost/config", json.dumps(cost_config), qos=1, retain=True, ) logger.info("Published HA discovery for meter %d (%s)", meter.id, meter.name) # ------------------------------------------------------------------ # # Callbacks # ------------------------------------------------------------------ # def _on_connect(self, client, userdata, connect_flags, reason_code, properties): if reason_code == 0: logger.info("Connected to MQTT broker") self.publish_online() # Subscribe to HA status so we re-publish discovery on HA restart. client.subscribe( f"{self._config.ha_autodiscovery_topic}/status", qos=1, ) self._publish_discovery() else: logger.error("MQTT connection failed: %s", reason_code) def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties): if reason_code == 0: logger.info("Disconnected from MQTT broker (clean)") else: logger.warning("Lost MQTT connection (rc=%s), will auto-reconnect", reason_code) def _on_message(self, client, userdata, message): # Re-publish discovery when HA comes online. try: payload = message.payload.decode() except (UnicodeDecodeError, AttributeError): return if message.topic.endswith("/status") and payload == "online": logger.info("Home Assistant came online, re-publishing discovery") self._publish_discovery()