| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494 |
- """MQTT Smart Plug Service for subscribing to external MQTT topics and extracting power/energy data.
- This service enables integration with Shelly, Zigbee2MQTT, and other MQTT-based energy monitoring devices.
- """
- import asyncio
- import json
- import logging
- import threading
- from dataclasses import dataclass, field
- from datetime import datetime, timedelta, timezone
- from typing import Any
- import paho.mqtt.client as mqtt
- logger = logging.getLogger(__name__)
- @dataclass
- class SmartPlugMQTTData:
- """Latest data received from an MQTT smart plug."""
- plug_id: int
- power: float | None = None # Current power in watts
- energy: float | None = None # Energy in kWh (today)
- state: str | None = None # "ON" or "OFF"
- last_seen: datetime = field(default_factory=datetime.utcnow)
- @dataclass
- class MQTTDataSourceConfig:
- """Configuration for a single MQTT data source (power, energy, or state)."""
- topic: str
- path: str
- multiplier: float = 1.0 # For power/energy
- on_value: str | None = None # For state (what value means "ON")
- class MQTTSmartPlugService:
- """Subscribes to MQTT topics for smart plug energy monitoring."""
- # Consider plug unreachable if no message received in this time
- REACHABLE_TIMEOUT_MINUTES = 5
- def __init__(self):
- self.client: mqtt.Client | None = None
- self.connected = False
- self._lock = threading.Lock()
- # topic -> list of (plug_id, data_type) where data_type is "power", "energy", or "state"
- self.subscriptions: dict[str, list[tuple[int, str]]] = {}
- # plug_id -> {data_type: MQTTDataSourceConfig}
- self.plug_configs: dict[int, dict[str, MQTTDataSourceConfig]] = {}
- # plug_id -> latest data
- self.plug_data: dict[int, SmartPlugMQTTData] = {}
- self._disconnection_event: threading.Event | None = None
- self._configured = False
- self._broker = ""
- self._port = 1883
- self._username = ""
- self._password = ""
- self._use_tls = False
- def is_configured(self) -> bool:
- """Check if the MQTT service is configured and connected."""
- return self._configured and self.connected
- def has_broker_settings(self) -> bool:
- """Check if broker settings are available (even if not connected yet)."""
- return bool(self._broker)
- async def configure(self, settings: dict) -> bool:
- """Configure MQTT connection from settings.
- Uses the same broker settings as the MQTT relay service.
- Returns True if connection was successful or MQTT is disabled.
- """
- enabled = settings.get("mqtt_enabled", False)
- if not enabled:
- await self.disconnect()
- self._configured = False
- logger.debug("MQTT smart plug service disabled (MQTT relay not enabled)")
- return True
- broker = settings.get("mqtt_broker", "")
- port = settings.get("mqtt_port", 1883)
- username = settings.get("mqtt_username", "")
- password = settings.get("mqtt_password", "")
- use_tls = settings.get("mqtt_use_tls", False)
- if not broker:
- logger.warning("MQTT smart plug service: no broker configured")
- self._configured = False
- return False
- # Check if settings changed
- settings_changed = (
- self._broker != broker
- or self._port != port
- or self._username != username
- or self._password != password
- or self._use_tls != use_tls
- )
- self._broker = broker
- self._port = port
- self._username = username
- self._password = password
- self._use_tls = use_tls
- self._configured = True
- # Disconnect and reconnect if settings changed
- if settings_changed and self.client:
- await self.disconnect()
- # Connect if not already connected
- if not self.client or not self.connected:
- return await self._connect()
- return True
- async def _connect(self) -> bool:
- """Establish MQTT connection."""
- import asyncio
- import ssl
- try:
- # Create client with callback API version 2
- self.client = mqtt.Client(
- callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
- client_id=f"bambuddy-smartplug-{id(self)}",
- protocol=mqtt.MQTTv311,
- )
- # Set up callbacks
- self.client.on_connect = self._on_connect
- self.client.on_disconnect = self._on_disconnect
- self.client.on_message = self._on_message
- # Configure authentication
- if self._username:
- self.client.username_pw_set(self._username, self._password)
- # Configure TLS
- if self._use_tls:
- self.client.tls_set(cert_reqs=ssl.CERT_NONE)
- self.client.tls_insecure_set(True)
- # Connect with timeout
- try:
- await asyncio.wait_for(
- asyncio.to_thread(self.client.connect_async, self._broker, self._port, 60),
- timeout=3.0,
- )
- except TimeoutError:
- logger.warning("MQTT smart plug connection to %s:%s timed out", self._broker, self._port)
- return False
- self.client.loop_start()
- # Wait briefly for connection
- await asyncio.sleep(1.0)
- if self.connected:
- logger.info("MQTT smart plug service connected to %s:%s", self._broker, self._port)
- # Resubscribe to all topics
- self._resubscribe_all()
- return True
- else:
- logger.warning("MQTT smart plug connection pending to %s:%s", self._broker, self._port)
- return True # Connection is async
- except Exception as e:
- logger.error("MQTT smart plug connection failed: %s", e)
- self.connected = False
- return False
- def _on_connect(
- self,
- client: mqtt.Client,
- userdata: Any,
- flags: dict,
- reason_code: int | mqtt.ReasonCode,
- properties: mqtt.Properties | None = None,
- ):
- """Callback when connected to broker."""
- rc = reason_code if isinstance(reason_code, int) else reason_code.value
- if rc == 0:
- self.connected = True
- logger.info("MQTT smart plug service connected successfully")
- # Resubscribe to all topics
- self._resubscribe_all()
- else:
- self.connected = False
- logger.error("MQTT smart plug connection failed: %s", reason_code)
- def _on_disconnect(
- self,
- client: mqtt.Client,
- userdata: Any,
- flags_or_rc: dict | int | mqtt.ReasonCode,
- reason_code: int | mqtt.ReasonCode | None = None,
- properties: mqtt.Properties | None = None,
- ):
- """Callback when disconnected from broker."""
- self.connected = False
- rc = reason_code if reason_code is not None else flags_or_rc
- rc_val = rc if isinstance(rc, int) else getattr(rc, "value", 0)
- if rc_val != 0:
- logger.warning("MQTT smart plug service disconnected: %s", rc)
- else:
- logger.info("MQTT smart plug service disconnected cleanly")
- if self._disconnection_event:
- self._disconnection_event.set()
- def _on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage):
- """Handle incoming MQTT message, extract data using JSON path."""
- topic = msg.topic
- with self._lock:
- subscriptions = self.subscriptions.get(topic, [])
- if not subscriptions:
- return
- # Parse JSON payload (or treat as raw value)
- try:
- payload = json.loads(msg.payload.decode("utf-8"))
- is_json = True
- except (json.JSONDecodeError, UnicodeDecodeError):
- # Not JSON - treat the whole payload as a raw value
- payload = msg.payload.decode("utf-8").strip()
- is_json = False
- # Process for each subscribed (plug_id, data_type)
- for plug_id, data_type in subscriptions:
- configs = self.plug_configs.get(plug_id, {})
- config = configs.get(data_type)
- if not config:
- continue
- # Extract value using path (or use raw payload if no path)
- if is_json and config.path:
- raw_value = self._extract_json_path(payload, config.path)
- elif is_json and not config.path:
- # JSON but no path - if it's a simple value use it, otherwise skip
- if isinstance(payload, (int, float, str, bool)):
- raw_value = payload
- else:
- # Can't use a dict/list as a value
- logger.debug("MQTT plug %s: JSON payload is object/array but no path configured", plug_id)
- continue
- else:
- # Raw value (non-JSON)
- raw_value = payload
- if raw_value is None:
- continue
- # Initialize plug data if needed
- if plug_id not in self.plug_data:
- self.plug_data[plug_id] = SmartPlugMQTTData(plug_id=plug_id)
- data = self.plug_data[plug_id]
- data.last_seen = datetime.now(timezone.utc)
- # Process based on data type
- if data_type == "power":
- try:
- data.power = float(raw_value) * config.multiplier
- logger.debug("MQTT smart plug %s: power=%s", plug_id, data.power)
- except (ValueError, TypeError):
- pass # Ignore unparseable power reading from MQTT
- elif data_type == "energy":
- try:
- data.energy = float(raw_value) * config.multiplier
- logger.debug("MQTT smart plug %s: energy=%s", plug_id, data.energy)
- except (ValueError, TypeError):
- pass # Ignore unparseable energy reading from MQTT
- elif data_type == "state":
- state_str = str(raw_value)
- # Check against configured ON value if set
- if config.on_value:
- # Case-insensitive comparison
- if state_str.lower() == config.on_value.lower():
- data.state = "ON"
- else:
- data.state = "OFF"
- else:
- # Default behavior: normalize common values
- upper_state = state_str.upper()
- if upper_state in ("ON", "1", "TRUE"):
- data.state = "ON"
- elif upper_state in ("OFF", "0", "FALSE"):
- data.state = "OFF"
- else:
- data.state = state_str
- logger.debug("MQTT smart plug %s: state=%s", plug_id, data.state)
- def _extract_json_path(self, data: dict, path: str) -> Any:
- """Extract value using dot notation (e.g., 'power_l1' or 'data.power').
- Supports simple dot notation for nested objects.
- """
- if not path:
- return None
- parts = path.split(".")
- current = data
- for part in parts:
- if isinstance(current, dict) and part in current:
- current = current[part]
- else:
- return None
- return current
- def _resubscribe_all(self):
- """Resubscribe to all registered topics after reconnection."""
- if not self.client or not self.connected:
- return
- with self._lock:
- for topic in self.subscriptions:
- if self.subscriptions[topic]: # Only if there are subscribers
- try:
- self.client.subscribe(topic, qos=1)
- logger.debug("MQTT smart plug: resubscribed to %s", topic)
- except Exception as e:
- logger.error("MQTT smart plug: failed to resubscribe to %s: %s", topic, e)
- def subscribe(
- self,
- plug_id: int,
- # Power source
- power_topic: str | None = None,
- power_path: str | None = None,
- power_multiplier: float = 1.0,
- # Energy source
- energy_topic: str | None = None,
- energy_path: str | None = None,
- energy_multiplier: float = 1.0,
- # State source
- state_topic: str | None = None,
- state_path: str | None = None,
- state_on_value: str | None = None,
- # Legacy: single topic/path/multiplier (for backward compatibility)
- topic: str | None = None,
- multiplier: float = 1.0,
- ):
- """Subscribe to MQTT topics for a plug.
- Each data type (power, energy, state) can have its own topic.
- For backward compatibility, if power_topic is not set but topic is,
- topic will be used for all data types that have paths configured.
- """
- with self._lock:
- # Initialize config for this plug
- self.plug_configs[plug_id] = {}
- # Determine topics (new fields take priority, fall back to legacy)
- effective_power_topic = power_topic or topic
- effective_energy_topic = energy_topic or topic
- effective_state_topic = state_topic or topic
- # Use new multipliers or fall back to legacy
- effective_power_mult = power_multiplier if power_multiplier != 1.0 else multiplier
- effective_energy_mult = energy_multiplier if energy_multiplier != 1.0 else multiplier
- # Configure power subscription (path is optional - empty means use raw payload)
- if effective_power_topic:
- config = MQTTDataSourceConfig(
- topic=effective_power_topic,
- path=power_path or "",
- multiplier=effective_power_mult,
- )
- self.plug_configs[plug_id]["power"] = config
- self._add_subscription(plug_id, effective_power_topic, "power")
- # Configure energy subscription (path is optional - empty means use raw payload)
- if effective_energy_topic:
- config = MQTTDataSourceConfig(
- topic=effective_energy_topic,
- path=energy_path or "",
- multiplier=effective_energy_mult,
- )
- self.plug_configs[plug_id]["energy"] = config
- self._add_subscription(plug_id, effective_energy_topic, "energy")
- # Configure state subscription (path is optional - empty means use raw payload)
- if effective_state_topic:
- config = MQTTDataSourceConfig(
- topic=effective_state_topic,
- path=state_path or "",
- on_value=state_on_value,
- )
- self.plug_configs[plug_id]["state"] = config
- self._add_subscription(plug_id, effective_state_topic, "state")
- # Initialize data entry
- if plug_id not in self.plug_data:
- self.plug_data[plug_id] = SmartPlugMQTTData(plug_id=plug_id)
- logger.info(
- f"MQTT smart plug {plug_id}: configured with "
- f"power={effective_power_topic if power_path else None}, "
- f"energy={effective_energy_topic if energy_path else None}, "
- f"state={effective_state_topic if state_path else None}"
- )
- def _add_subscription(self, plug_id: int, topic: str, data_type: str):
- """Add a subscription for a plug/data_type to a topic."""
- if topic not in self.subscriptions:
- self.subscriptions[topic] = []
- # Actually subscribe if connected
- if self.client and self.connected:
- try:
- self.client.subscribe(topic, qos=1)
- logger.info("MQTT smart plug: subscribed to %s", topic)
- except Exception as e:
- logger.error("MQTT smart plug: failed to subscribe to %s: %s", topic, e)
- entry = (plug_id, data_type)
- if entry not in self.subscriptions[topic]:
- self.subscriptions[topic].append(entry)
- def unsubscribe(self, plug_id: int):
- """Unsubscribe when plug is deleted/updated."""
- with self._lock:
- # Get all configs for this plug
- configs = self.plug_configs.pop(plug_id, {})
- if not configs:
- # Still clean up any stray subscriptions
- pass
- # Collect all topics this plug was subscribed to
- topics_to_check = set()
- for _data_type, config in configs.items():
- topics_to_check.add(config.topic)
- # Also scan subscriptions to remove any entries for this plug
- for topic in list(self.subscriptions.keys()):
- # Remove all entries for this plug_id
- self.subscriptions[topic] = [(pid, dtype) for pid, dtype in self.subscriptions[topic] if pid != plug_id]
- topics_to_check.add(topic)
- # Unsubscribe from topics with no more subscribers
- for topic in topics_to_check:
- if topic in self.subscriptions and not self.subscriptions[topic]:
- del self.subscriptions[topic]
- if self.client and self.connected:
- try:
- self.client.unsubscribe(topic)
- logger.info("MQTT smart plug: unsubscribed from %s", topic)
- except Exception as e:
- logger.error("MQTT smart plug: failed to unsubscribe from %s: %s", topic, e)
- # Remove data
- self.plug_data.pop(plug_id, None)
- def get_plug_data(self, plug_id: int) -> SmartPlugMQTTData | None:
- """Get latest data for a plug (called by status endpoint)."""
- with self._lock:
- return self.plug_data.get(plug_id)
- def is_reachable(self, plug_id: int) -> bool:
- """Check if a plug has received data recently."""
- data = self.get_plug_data(plug_id)
- if not data:
- return False
- timeout = timedelta(minutes=self.REACHABLE_TIMEOUT_MINUTES)
- return datetime.now(timezone.utc) - data.last_seen < timeout
- async def disconnect(self, timeout: float = 0):
- """Disconnect from MQTT broker."""
- if self.client:
- try:
- self._disconnection_event = threading.Event()
- self.client.disconnect()
- await asyncio.to_thread(self._disconnection_event.wait, timeout=timeout)
- self.client.loop_stop()
- except Exception as e:
- logger.debug("MQTT smart plug disconnect error (ignored): %s", e)
- finally:
- self.client = None
- self.connected = False
- # Global instance
- mqtt_smart_plug_service = MQTTSmartPlugService()
|