Files
HAMeter/hameter/discovery.py
2026-03-06 12:25:27 -05:00

190 lines
5.7 KiB
Python

"""Discovery mode: listen for all nearby meter transmissions."""
import json
import logging
import threading
import time
from typing import Optional
from hameter.config import GeneralConfig
from hameter.meter import parse_rtlamr_line
from hameter.state import PipelineStatus
from hameter.subprocess_manager import SubprocessManager
logger = logging.getLogger(__name__)
def run_discovery(
config: GeneralConfig,
shutdown_event: threading.Event,
duration: int = 120,
):
"""Run in discovery mode: capture all meter transmissions and summarize.
Args:
config: General configuration (device_id, rtl_tcp settings).
shutdown_event: Threading event to signal early shutdown.
duration: How many seconds to listen before stopping.
"""
proc_mgr = SubprocessManager(config, shutdown_event)
if not proc_mgr.start_discovery_mode():
logger.error("Failed to start SDR in discovery mode")
return
seen: dict[int, dict] = {}
logger.info("=" * 60)
logger.info("DISCOVERY MODE")
logger.info("Listening for %d seconds. Press Ctrl+C to stop early.", duration)
logger.info("All nearby meter transmissions will be logged.")
logger.info("=" * 60)
start = time.monotonic()
try:
while not shutdown_event.is_set() and (time.monotonic() - start) < duration:
line = proc_mgr.get_line(timeout=1.0)
if not line:
continue
try:
reading = parse_rtlamr_line(line, meters={})
except json.JSONDecodeError:
continue
except Exception:
continue
if not reading:
continue
mid = reading.meter_id
if mid not in seen:
seen[mid] = {
"protocol": reading.protocol,
"count": 0,
"first_seen": reading.timestamp,
"last_consumption": 0,
}
logger.info(
" NEW METER: ID=%-12d Protocol=%-6s Consumption=%d",
mid,
reading.protocol,
reading.raw_consumption,
)
seen[mid]["count"] += 1
seen[mid]["last_consumption"] = reading.raw_consumption
seen[mid]["last_seen"] = reading.timestamp
finally:
proc_mgr.stop()
# Print summary.
logger.info("")
logger.info("=" * 60)
logger.info("DISCOVERY SUMMARY — %d unique meters found", len(seen))
logger.info("=" * 60)
logger.info(
"%-12s %-8s %-6s %-15s",
"Meter ID", "Protocol", "Count", "Last Reading",
)
logger.info("-" * 50)
for mid, info in sorted(seen.items(), key=lambda x: -x[1]["count"]):
logger.info(
"%-12d %-8s %-6d %-15d",
mid,
info["protocol"],
info["count"],
info["last_consumption"],
)
logger.info("")
logger.info(
"To add a meter, use the web UI at http://localhost:9090/config/meters"
)
def run_discovery_for_web(
config: GeneralConfig,
shutdown_event: threading.Event,
app_state,
duration: int = 120,
stop_event: Optional[threading.Event] = None,
):
"""Run discovery mode, reporting results to AppState for the web UI.
Args:
config: General configuration.
shutdown_event: Global shutdown signal.
app_state: Shared AppState to report discoveries to.
duration: How many seconds to listen.
stop_event: Optional event to stop discovery early (from web UI).
"""
proc_mgr = SubprocessManager(config, shutdown_event)
if not proc_mgr.start_discovery_mode():
logger.error("Failed to start SDR in discovery mode")
app_state.set_status(PipelineStatus.ERROR, "Failed to start SDR for discovery")
return
logger.info("Discovery mode started, listening for %d seconds", duration)
start = time.monotonic()
try:
while (
not shutdown_event.is_set()
and (time.monotonic() - start) < duration
and not (stop_event and stop_event.is_set())
):
line = proc_mgr.get_line(timeout=1.0)
if not line:
continue
try:
reading = parse_rtlamr_line(line, meters={})
except json.JSONDecodeError:
continue
except Exception:
continue
if not reading:
continue
mid = reading.meter_id
existing = app_state.get_discovery_results()
if mid in existing:
prev = existing[mid]
info = {
"protocol": prev["protocol"],
"count": prev["count"] + 1,
"first_seen": prev["first_seen"],
"last_consumption": reading.raw_consumption,
"last_seen": reading.timestamp,
}
else:
info = {
"protocol": reading.protocol,
"count": 1,
"first_seen": reading.timestamp,
"last_consumption": reading.raw_consumption,
"last_seen": reading.timestamp,
}
logger.info(
"Discovery: new meter ID=%d Protocol=%s Consumption=%d",
mid,
reading.protocol,
reading.raw_consumption,
)
app_state.record_discovery(mid, info)
finally:
proc_mgr.stop()
results = app_state.get_discovery_results()
logger.info("Discovery complete: %d unique meters found", len(results))