305 lines
10 KiB
Python
305 lines
10 KiB
Python
"""Entry point for HAMeter: python -m hameter."""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
|
|
from hameter import __version__
|
|
from hameter.config import (
|
|
CONFIG_PATH,
|
|
config_exists,
|
|
load_config_from_json,
|
|
load_config_from_yaml,
|
|
save_config,
|
|
)
|
|
from hameter.cost_state import load_cost_state
|
|
from hameter.discovery import run_discovery_for_web
|
|
from hameter.pipeline import Pipeline
|
|
from hameter.state import AppState, CostState, PipelineStatus, WebLogHandler
|
|
from hameter.web import create_app
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="hameter",
|
|
description="HAMeter — SDR utility meter reader for Home Assistant",
|
|
)
|
|
parser.add_argument(
|
|
"--port", "-p",
|
|
type=int,
|
|
default=9090,
|
|
help="Web UI port (default: 9090)",
|
|
)
|
|
parser.add_argument(
|
|
"--version", "-v",
|
|
action="version",
|
|
version=f"hameter {__version__}",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Set up basic logging early.
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
stream=sys.stdout,
|
|
)
|
|
logger = logging.getLogger("hameter")
|
|
|
|
# Create shared state.
|
|
app_state = AppState()
|
|
|
|
# Attach web log handler so logs flow to the UI.
|
|
web_handler = WebLogHandler(app_state)
|
|
web_handler.setLevel(logging.DEBUG)
|
|
logging.getLogger().addHandler(web_handler)
|
|
|
|
logger.info("HAMeter v%s starting", __version__)
|
|
|
|
# Shutdown event.
|
|
shutdown_event = threading.Event()
|
|
|
|
def _signal_handler(signum, _frame):
|
|
sig_name = signal.Signals(signum).name
|
|
logger.info("Received %s, initiating shutdown...", sig_name)
|
|
shutdown_event.set()
|
|
|
|
signal.signal(signal.SIGTERM, _signal_handler)
|
|
signal.signal(signal.SIGINT, _signal_handler)
|
|
|
|
# Try loading existing config.
|
|
_try_load_config(app_state, logger)
|
|
|
|
# Start Flask web server in daemon thread.
|
|
flask_app = create_app(app_state)
|
|
flask_thread = threading.Thread(
|
|
target=lambda: flask_app.run(
|
|
host="0.0.0.0",
|
|
port=args.port,
|
|
threaded=True,
|
|
use_reloader=False,
|
|
),
|
|
name="flask-web",
|
|
daemon=True,
|
|
)
|
|
flask_thread.start()
|
|
logger.info("Web UI available at http://0.0.0.0:%d", args.port)
|
|
|
|
# Main pipeline loop.
|
|
_pipeline_loop(app_state, shutdown_event, logger)
|
|
|
|
logger.info("HAMeter stopped")
|
|
|
|
|
|
def _try_load_config(app_state: AppState, logger: logging.Logger):
|
|
"""Attempt to load config from JSON, or migrate from YAML, or wait for setup."""
|
|
if config_exists():
|
|
try:
|
|
config = load_config_from_json()
|
|
app_state.set_config(config)
|
|
app_state.config_ready.set()
|
|
app_state.set_status(PipelineStatus.STOPPED, "Config loaded")
|
|
logger.info("Loaded config from %s", CONFIG_PATH)
|
|
return
|
|
except Exception as e:
|
|
logger.error("Failed to load config: %s", e)
|
|
app_state.set_status(PipelineStatus.ERROR, str(e))
|
|
return
|
|
|
|
# Check for YAML migration.
|
|
yaml_paths = ["/config/hameter.yaml", "/app/config/hameter.yaml"]
|
|
for yp in yaml_paths:
|
|
if os.path.isfile(yp):
|
|
try:
|
|
config = load_config_from_yaml(yp)
|
|
save_config(config)
|
|
app_state.set_config(config)
|
|
app_state.config_ready.set()
|
|
app_state.set_status(PipelineStatus.STOPPED, "Migrated from YAML")
|
|
logger.info("Migrated YAML config from %s to %s", yp, CONFIG_PATH)
|
|
return
|
|
except Exception as e:
|
|
logger.warning("Failed to migrate YAML config: %s", e)
|
|
|
|
# No config found — wait for setup wizard.
|
|
app_state.set_status(PipelineStatus.UNCONFIGURED)
|
|
logger.info("No config found. Use the web UI to complete setup.")
|
|
|
|
|
|
def _pipeline_loop(
|
|
app_state: AppState,
|
|
shutdown_event: threading.Event,
|
|
logger: logging.Logger,
|
|
):
|
|
"""Main loop: run pipeline, handle restart requests, handle discovery."""
|
|
while not shutdown_event.is_set():
|
|
# Wait for config to be ready.
|
|
if not app_state.config_ready.is_set():
|
|
logger.info("Waiting for configuration via web UI...")
|
|
while not shutdown_event.is_set():
|
|
if app_state.config_ready.wait(timeout=1.0):
|
|
break
|
|
if shutdown_event.is_set():
|
|
return
|
|
|
|
# Load/reload config.
|
|
try:
|
|
config = load_config_from_json()
|
|
app_state.set_config(config)
|
|
except Exception as e:
|
|
logger.error("Failed to load config: %s", e)
|
|
app_state.set_status(PipelineStatus.ERROR, str(e))
|
|
_wait_for_restart_or_shutdown(app_state, shutdown_event)
|
|
continue
|
|
|
|
# Restore persisted cost state.
|
|
_load_persisted_cost_state(app_state, config, logger)
|
|
|
|
# Update log level from config.
|
|
logging.getLogger().setLevel(
|
|
getattr(logging, config.general.log_level, logging.INFO)
|
|
)
|
|
|
|
# Check for discovery request before starting pipeline.
|
|
if app_state.discovery_requested.is_set():
|
|
_run_web_discovery(app_state, config, shutdown_event, logger)
|
|
continue
|
|
|
|
# If no meters configured, wait.
|
|
if not config.meters:
|
|
app_state.set_status(PipelineStatus.STOPPED, "No meters configured")
|
|
logger.info("No meters configured. Add meters via the web UI.")
|
|
while not shutdown_event.is_set():
|
|
if app_state.restart_requested.wait(timeout=1.0):
|
|
app_state.restart_requested.clear()
|
|
break
|
|
if app_state.discovery_requested.is_set():
|
|
_run_web_discovery(app_state, config, shutdown_event, logger)
|
|
break
|
|
continue
|
|
|
|
# Run the pipeline.
|
|
app_state.set_status(PipelineStatus.STARTING)
|
|
pipeline = Pipeline(config, shutdown_event, app_state)
|
|
|
|
logger.info(
|
|
"Starting pipeline with %d meter(s): %s",
|
|
len(config.meters),
|
|
", ".join(f"{m.name} ({m.id})" for m in config.meters),
|
|
)
|
|
|
|
pipeline.run()
|
|
|
|
# Check why we exited.
|
|
if shutdown_event.is_set():
|
|
return
|
|
|
|
if app_state.discovery_requested.is_set():
|
|
_run_web_discovery(app_state, config, shutdown_event, logger)
|
|
continue
|
|
|
|
if app_state.restart_requested.is_set():
|
|
app_state.restart_requested.clear()
|
|
app_state.set_status(PipelineStatus.RESTARTING, "Reloading configuration...")
|
|
logger.info("Pipeline restart requested, reloading config...")
|
|
continue
|
|
|
|
# Pipeline exited on its own (error or subprocess failure).
|
|
if not shutdown_event.is_set():
|
|
app_state.set_status(PipelineStatus.ERROR, "Pipeline exited unexpectedly")
|
|
_wait_for_restart_or_shutdown(app_state, shutdown_event)
|
|
|
|
|
|
def _load_persisted_cost_state(
|
|
app_state: AppState,
|
|
config,
|
|
logger: logging.Logger,
|
|
):
|
|
"""Load persisted cost state and restore into AppState."""
|
|
saved = load_cost_state()
|
|
if not saved:
|
|
return
|
|
|
|
meters_with_cost = {m.id for m in config.meters if m.cost_factors}
|
|
restored = 0
|
|
for mid_str, cs_data in saved.items():
|
|
try:
|
|
mid = int(mid_str)
|
|
except (ValueError, TypeError):
|
|
logger.warning("Skipping cost state with invalid meter ID: %s", mid_str)
|
|
continue
|
|
if mid not in meters_with_cost:
|
|
continue
|
|
try:
|
|
cs = CostState(
|
|
cumulative_cost=float(cs_data.get("cumulative_cost", 0.0)),
|
|
last_calibrated_reading=(
|
|
float(cs_data["last_calibrated_reading"])
|
|
if cs_data.get("last_calibrated_reading") is not None
|
|
else None
|
|
),
|
|
billing_period_start=str(cs_data.get("billing_period_start", "")),
|
|
last_updated=str(cs_data.get("last_updated", "")),
|
|
fixed_charges_applied=float(cs_data.get("fixed_charges_applied", 0.0)),
|
|
)
|
|
except (ValueError, TypeError, KeyError) as e:
|
|
logger.warning("Skipping corrupt cost state for meter %s: %s", mid_str, e)
|
|
continue
|
|
app_state.update_cost_state(mid, cs)
|
|
restored += 1
|
|
|
|
if restored:
|
|
logger.info("Restored cost state for %d meter(s)", restored)
|
|
|
|
# Clean up orphaned cost states for meters no longer in config.
|
|
all_meter_ids = {m.id for m in config.meters}
|
|
current_cost_states = app_state.get_cost_states()
|
|
for mid in list(current_cost_states.keys()):
|
|
if mid not in all_meter_ids:
|
|
app_state.remove_cost_state(mid)
|
|
logger.info("Removed orphaned cost state for meter %d", mid)
|
|
|
|
|
|
def _wait_for_restart_or_shutdown(
|
|
app_state: AppState,
|
|
shutdown_event: threading.Event,
|
|
):
|
|
"""Block until a restart is requested or shutdown."""
|
|
while not shutdown_event.is_set():
|
|
if app_state.restart_requested.wait(timeout=1.0):
|
|
app_state.restart_requested.clear()
|
|
break
|
|
|
|
|
|
def _run_web_discovery(
|
|
app_state: AppState,
|
|
config,
|
|
shutdown_event: threading.Event,
|
|
logger: logging.Logger,
|
|
):
|
|
"""Run discovery mode triggered from web UI."""
|
|
app_state.discovery_requested.clear()
|
|
app_state.set_status(PipelineStatus.DISCOVERY)
|
|
app_state.clear_discovery_results()
|
|
|
|
duration = app_state.discovery_duration
|
|
logger.info("Starting web-triggered discovery for %d seconds", duration)
|
|
|
|
run_discovery_for_web(
|
|
config=config.general,
|
|
shutdown_event=shutdown_event,
|
|
app_state=app_state,
|
|
duration=duration,
|
|
stop_event=app_state.stop_discovery,
|
|
)
|
|
|
|
app_state.stop_discovery.clear()
|
|
app_state.set_status(PipelineStatus.STOPPED, "Discovery complete")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|