253 lines
9.3 KiB
Python
253 lines
9.3 KiB
Python
"""MQTT client with Home Assistant auto-discovery support."""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import paho.mqtt.client as mqtt
|
|
from paho.mqtt.enums import CallbackAPIVersion
|
|
|
|
from hameter import __version__
|
|
from hameter.config import MeterConfig, MqttConfig
|
|
from hameter.meter import MeterReading
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HaMeterMQTT:
|
|
"""Manages MQTT connection, HA discovery, and meter state publishing."""
|
|
|
|
def __init__(self, config: MqttConfig, meters: list[MeterConfig]):
|
|
self._config = config
|
|
self._meters = meters
|
|
self._meters_by_id: dict[int, MeterConfig] = {m.id: m for m in meters}
|
|
|
|
self._client = mqtt.Client(
|
|
callback_api_version=CallbackAPIVersion.VERSION2,
|
|
client_id=config.client_id,
|
|
)
|
|
|
|
# Last Will: broker publishes "offline" if we disconnect unexpectedly.
|
|
self._client.will_set(
|
|
topic=f"{config.base_topic}/status",
|
|
payload="offline",
|
|
qos=1,
|
|
retain=True,
|
|
)
|
|
|
|
if config.user:
|
|
self._client.username_pw_set(config.user, config.password)
|
|
|
|
self._client.on_connect = self._on_connect
|
|
self._client.on_disconnect = self._on_disconnect
|
|
self._client.on_message = self._on_message
|
|
|
|
def connect(self):
|
|
"""Connect to the MQTT broker and start the network loop.
|
|
|
|
Raises:
|
|
OSError: If the broker is unreachable.
|
|
"""
|
|
logger.info("Connecting to MQTT broker at %s:%d", self._config.host, self._config.port)
|
|
try:
|
|
self._client.connect(self._config.host, self._config.port, keepalive=60)
|
|
except OSError as e:
|
|
logger.error("Failed to connect to MQTT broker: %s", e)
|
|
raise
|
|
self._client.loop_start()
|
|
|
|
def disconnect(self):
|
|
"""Publish offline status and cleanly disconnect."""
|
|
try:
|
|
self._client.publish(
|
|
f"{self._config.base_topic}/status", "offline", qos=1, retain=True,
|
|
)
|
|
self._client.loop_stop()
|
|
self._client.disconnect()
|
|
except Exception as e:
|
|
logger.warning("Error during MQTT disconnect: %s", e)
|
|
|
|
def publish_online(self):
|
|
"""Publish online availability status."""
|
|
self._client.publish(
|
|
f"{self._config.base_topic}/status", "online", qos=1, retain=True,
|
|
)
|
|
|
|
def publish_reading(self, reading: MeterReading):
|
|
"""Publish a meter reading to the state topic."""
|
|
state_payload = {
|
|
"reading": reading.calibrated_consumption,
|
|
"raw_reading": reading.raw_consumption,
|
|
"timestamp": reading.timestamp,
|
|
"protocol": reading.protocol,
|
|
}
|
|
self._client.publish(
|
|
f"{self._config.base_topic}/{reading.meter_id}/state",
|
|
json.dumps(state_payload),
|
|
qos=1,
|
|
retain=True,
|
|
)
|
|
|
|
meter_cfg = self._meters_by_id.get(reading.meter_id)
|
|
unit = meter_cfg.unit_of_measurement if meter_cfg else ""
|
|
logger.info(
|
|
"Published: meter=%d raw=%d calibrated=%.4f %s",
|
|
reading.meter_id,
|
|
reading.raw_consumption,
|
|
reading.calibrated_consumption,
|
|
unit,
|
|
)
|
|
|
|
def publish_cost(self, meter_id: int, cumulative_cost: float):
|
|
"""Publish cumulative cost for a meter."""
|
|
payload = {"cost": round(cumulative_cost, 2)}
|
|
self._client.publish(
|
|
f"{self._config.base_topic}/{meter_id}/cost",
|
|
json.dumps(payload),
|
|
qos=1,
|
|
retain=True,
|
|
)
|
|
logger.debug("Published cost: meter=%d cost=$%.2f", meter_id, cumulative_cost)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# HA Auto-Discovery
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _publish_discovery(self):
|
|
"""Publish HA MQTT auto-discovery config for each meter."""
|
|
if not self._config.ha_autodiscovery:
|
|
return
|
|
|
|
for meter in self._meters:
|
|
device_id = f"hameter_{meter.id}"
|
|
base = self._config.base_topic
|
|
disco = self._config.ha_autodiscovery_topic
|
|
|
|
device_block = {
|
|
"identifiers": [device_id],
|
|
"name": meter.name,
|
|
"manufacturer": "HAMeter",
|
|
"model": f"ERT {meter.protocol.upper()}",
|
|
"sw_version": __version__,
|
|
"serial_number": str(meter.id),
|
|
}
|
|
|
|
availability_block = {
|
|
"availability_topic": f"{base}/status",
|
|
"payload_available": "online",
|
|
"payload_not_available": "offline",
|
|
}
|
|
|
|
# --- Calibrated reading sensor ---
|
|
reading_config = {
|
|
"name": f"{meter.name} Reading",
|
|
"unique_id": f"{device_id}_reading",
|
|
"state_topic": f"{base}/{meter.id}/state",
|
|
"value_template": "{{ value_json.reading }}",
|
|
"unit_of_measurement": meter.unit_of_measurement,
|
|
"state_class": meter.state_class,
|
|
"icon": meter.icon,
|
|
"device": device_block,
|
|
**availability_block,
|
|
}
|
|
if meter.device_class:
|
|
reading_config["device_class"] = meter.device_class
|
|
|
|
self._client.publish(
|
|
f"{disco}/sensor/{device_id}/reading/config",
|
|
json.dumps(reading_config),
|
|
qos=1,
|
|
retain=True,
|
|
)
|
|
|
|
# --- Last Seen sensor ---
|
|
lastseen_config = {
|
|
"name": f"{meter.name} Last Seen",
|
|
"unique_id": f"{device_id}_last_seen",
|
|
"state_topic": f"{base}/{meter.id}/state",
|
|
"value_template": "{{ value_json.timestamp }}",
|
|
"device_class": "timestamp",
|
|
"icon": "mdi:clock-outline",
|
|
"device": {"identifiers": [device_id], "name": meter.name},
|
|
**availability_block,
|
|
}
|
|
self._client.publish(
|
|
f"{disco}/sensor/{device_id}/last_seen/config",
|
|
json.dumps(lastseen_config),
|
|
qos=1,
|
|
retain=True,
|
|
)
|
|
|
|
# --- Raw reading sensor (diagnostic) ---
|
|
raw_config = {
|
|
"name": f"{meter.name} Raw Reading",
|
|
"unique_id": f"{device_id}_raw",
|
|
"state_topic": f"{base}/{meter.id}/state",
|
|
"value_template": "{{ value_json.raw_reading }}",
|
|
"icon": "mdi:counter",
|
|
"entity_category": "diagnostic",
|
|
"device": {"identifiers": [device_id], "name": meter.name},
|
|
**availability_block,
|
|
}
|
|
self._client.publish(
|
|
f"{disco}/sensor/{device_id}/raw/config",
|
|
json.dumps(raw_config),
|
|
qos=1,
|
|
retain=True,
|
|
)
|
|
|
|
# --- Cost sensor (only for meters with cost_factors) ---
|
|
if meter.cost_factors:
|
|
cost_config = {
|
|
"name": f"{meter.name} Cost",
|
|
"unique_id": f"{device_id}_cost",
|
|
"state_topic": f"{base}/{meter.id}/cost",
|
|
"value_template": "{{ value_json.cost }}",
|
|
"unit_of_measurement": "$",
|
|
"device_class": "monetary",
|
|
"state_class": "total",
|
|
"icon": "mdi:currency-usd",
|
|
"device": {"identifiers": [device_id], "name": meter.name},
|
|
**availability_block,
|
|
}
|
|
self._client.publish(
|
|
f"{disco}/sensor/{device_id}/cost/config",
|
|
json.dumps(cost_config),
|
|
qos=1,
|
|
retain=True,
|
|
)
|
|
|
|
logger.info("Published HA discovery for meter %d (%s)", meter.id, meter.name)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Callbacks
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _on_connect(self, client, userdata, connect_flags, reason_code, properties):
|
|
if reason_code == 0:
|
|
logger.info("Connected to MQTT broker")
|
|
self.publish_online()
|
|
# Subscribe to HA status so we re-publish discovery on HA restart.
|
|
client.subscribe(
|
|
f"{self._config.ha_autodiscovery_topic}/status", qos=1,
|
|
)
|
|
self._publish_discovery()
|
|
else:
|
|
logger.error("MQTT connection failed: %s", reason_code)
|
|
|
|
def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties):
|
|
if reason_code == 0:
|
|
logger.info("Disconnected from MQTT broker (clean)")
|
|
else:
|
|
logger.warning("Lost MQTT connection (rc=%s), will auto-reconnect", reason_code)
|
|
|
|
def _on_message(self, client, userdata, message):
|
|
# Re-publish discovery when HA comes online.
|
|
try:
|
|
payload = message.payload.decode()
|
|
except (UnicodeDecodeError, AttributeError):
|
|
return
|
|
if message.topic.endswith("/status") and payload == "online":
|
|
logger.info("Home Assistant came online, re-publishing discovery")
|
|
self._publish_discovery()
|