mqtt_smart_plug.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. """MQTT Smart Plug Service for subscribing to external MQTT topics and extracting power/energy data.
  2. This service enables integration with Shelly, Zigbee2MQTT, and other MQTT-based energy monitoring devices.
  3. """
  4. import asyncio
  5. import json
  6. import logging
  7. import threading
  8. from dataclasses import dataclass, field
  9. from datetime import datetime, timedelta, timezone
  10. from typing import Any
  11. import paho.mqtt.client as mqtt
  12. logger = logging.getLogger(__name__)
  13. @dataclass
  14. class SmartPlugMQTTData:
  15. """Latest data received from an MQTT smart plug."""
  16. plug_id: int
  17. power: float | None = None # Current power in watts
  18. energy: float | None = None # Energy in kWh (today)
  19. state: str | None = None # "ON" or "OFF"
  20. last_seen: datetime = field(default_factory=datetime.utcnow)
  21. @dataclass
  22. class MQTTDataSourceConfig:
  23. """Configuration for a single MQTT data source (power, energy, or state)."""
  24. topic: str
  25. path: str
  26. multiplier: float = 1.0 # For power/energy
  27. on_value: str | None = None # For state (what value means "ON")
  28. class MQTTSmartPlugService:
  29. """Subscribes to MQTT topics for smart plug energy monitoring."""
  30. # Consider plug unreachable if no message received in this time
  31. REACHABLE_TIMEOUT_MINUTES = 5
  32. def __init__(self):
  33. self.client: mqtt.Client | None = None
  34. self.connected = False
  35. self._lock = threading.Lock()
  36. # topic -> list of (plug_id, data_type) where data_type is "power", "energy", or "state"
  37. self.subscriptions: dict[str, list[tuple[int, str]]] = {}
  38. # plug_id -> {data_type: MQTTDataSourceConfig}
  39. self.plug_configs: dict[int, dict[str, MQTTDataSourceConfig]] = {}
  40. # plug_id -> latest data
  41. self.plug_data: dict[int, SmartPlugMQTTData] = {}
  42. self._disconnection_event: threading.Event | None = None
  43. self._configured = False
  44. self._broker = ""
  45. self._port = 1883
  46. self._username = ""
  47. self._password = ""
  48. self._use_tls = False
  49. def is_configured(self) -> bool:
  50. """Check if the MQTT service is configured and connected."""
  51. return self._configured and self.connected
  52. def has_broker_settings(self) -> bool:
  53. """Check if broker settings are available (even if not connected yet)."""
  54. return bool(self._broker)
  55. async def configure(self, settings: dict) -> bool:
  56. """Configure MQTT connection from settings.
  57. Uses the same broker settings as the MQTT relay service.
  58. Returns True if connection was successful or MQTT is disabled.
  59. """
  60. enabled = settings.get("mqtt_enabled", False)
  61. if not enabled:
  62. await self.disconnect()
  63. self._configured = False
  64. logger.debug("MQTT smart plug service disabled (MQTT relay not enabled)")
  65. return True
  66. broker = settings.get("mqtt_broker", "")
  67. port = settings.get("mqtt_port", 1883)
  68. username = settings.get("mqtt_username", "")
  69. password = settings.get("mqtt_password", "")
  70. use_tls = settings.get("mqtt_use_tls", False)
  71. if not broker:
  72. logger.warning("MQTT smart plug service: no broker configured")
  73. self._configured = False
  74. return False
  75. # Check if settings changed
  76. settings_changed = (
  77. self._broker != broker
  78. or self._port != port
  79. or self._username != username
  80. or self._password != password
  81. or self._use_tls != use_tls
  82. )
  83. self._broker = broker
  84. self._port = port
  85. self._username = username
  86. self._password = password
  87. self._use_tls = use_tls
  88. self._configured = True
  89. # Disconnect and reconnect if settings changed
  90. if settings_changed and self.client:
  91. await self.disconnect()
  92. # Connect if not already connected
  93. if not self.client or not self.connected:
  94. return await self._connect()
  95. return True
  96. async def _connect(self) -> bool:
  97. """Establish MQTT connection."""
  98. import asyncio
  99. import ssl
  100. try:
  101. # Create client with callback API version 2
  102. self.client = mqtt.Client(
  103. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  104. client_id=f"bambuddy-smartplug-{id(self)}",
  105. protocol=mqtt.MQTTv311,
  106. )
  107. # Set up callbacks
  108. self.client.on_connect = self._on_connect
  109. self.client.on_disconnect = self._on_disconnect
  110. self.client.on_message = self._on_message
  111. # Configure authentication
  112. if self._username:
  113. self.client.username_pw_set(self._username, self._password)
  114. # Configure TLS
  115. if self._use_tls:
  116. self.client.tls_set(cert_reqs=ssl.CERT_NONE)
  117. self.client.tls_insecure_set(True)
  118. # Connect with timeout
  119. try:
  120. await asyncio.wait_for(
  121. asyncio.to_thread(self.client.connect_async, self._broker, self._port, 60),
  122. timeout=3.0,
  123. )
  124. except TimeoutError:
  125. logger.warning("MQTT smart plug connection to %s:%s timed out", self._broker, self._port)
  126. return False
  127. self.client.loop_start()
  128. # Wait briefly for connection
  129. await asyncio.sleep(1.0)
  130. if self.connected:
  131. logger.info("MQTT smart plug service connected to %s:%s", self._broker, self._port)
  132. # Resubscribe to all topics
  133. self._resubscribe_all()
  134. return True
  135. else:
  136. logger.warning("MQTT smart plug connection pending to %s:%s", self._broker, self._port)
  137. return True # Connection is async
  138. except Exception as e:
  139. logger.error("MQTT smart plug connection failed: %s", e)
  140. self.connected = False
  141. return False
  142. def _on_connect(
  143. self,
  144. client: mqtt.Client,
  145. userdata: Any,
  146. flags: dict,
  147. reason_code: int | mqtt.ReasonCode,
  148. properties: mqtt.Properties | None = None,
  149. ):
  150. """Callback when connected to broker."""
  151. rc = reason_code if isinstance(reason_code, int) else reason_code.value
  152. if rc == 0:
  153. self.connected = True
  154. logger.info("MQTT smart plug service connected successfully")
  155. # Resubscribe to all topics
  156. self._resubscribe_all()
  157. else:
  158. self.connected = False
  159. logger.error("MQTT smart plug connection failed: %s", reason_code)
  160. def _on_disconnect(
  161. self,
  162. client: mqtt.Client,
  163. userdata: Any,
  164. flags_or_rc: dict | int | mqtt.ReasonCode,
  165. reason_code: int | mqtt.ReasonCode | None = None,
  166. properties: mqtt.Properties | None = None,
  167. ):
  168. """Callback when disconnected from broker."""
  169. self.connected = False
  170. rc = reason_code if reason_code is not None else flags_or_rc
  171. rc_val = rc if isinstance(rc, int) else getattr(rc, "value", 0)
  172. if rc_val != 0:
  173. logger.warning("MQTT smart plug service disconnected: %s", rc)
  174. else:
  175. logger.info("MQTT smart plug service disconnected cleanly")
  176. if self._disconnection_event:
  177. self._disconnection_event.set()
  178. def _on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage):
  179. """Handle incoming MQTT message, extract data using JSON path."""
  180. topic = msg.topic
  181. with self._lock:
  182. subscriptions = self.subscriptions.get(topic, [])
  183. if not subscriptions:
  184. return
  185. # Parse JSON payload (or treat as raw value)
  186. try:
  187. payload = json.loads(msg.payload.decode("utf-8"))
  188. is_json = True
  189. except (json.JSONDecodeError, UnicodeDecodeError):
  190. # Not JSON - treat the whole payload as a raw value
  191. payload = msg.payload.decode("utf-8").strip()
  192. is_json = False
  193. # Process for each subscribed (plug_id, data_type)
  194. for plug_id, data_type in subscriptions:
  195. configs = self.plug_configs.get(plug_id, {})
  196. config = configs.get(data_type)
  197. if not config:
  198. continue
  199. # Extract value using path (or use raw payload if no path)
  200. if is_json and config.path:
  201. raw_value = self._extract_json_path(payload, config.path)
  202. elif is_json and not config.path:
  203. # JSON but no path - if it's a simple value use it, otherwise skip
  204. if isinstance(payload, (int, float, str, bool)):
  205. raw_value = payload
  206. else:
  207. # Can't use a dict/list as a value
  208. logger.debug("MQTT plug %s: JSON payload is object/array but no path configured", plug_id)
  209. continue
  210. else:
  211. # Raw value (non-JSON)
  212. raw_value = payload
  213. if raw_value is None:
  214. continue
  215. # Initialize plug data if needed
  216. if plug_id not in self.plug_data:
  217. self.plug_data[plug_id] = SmartPlugMQTTData(plug_id=plug_id)
  218. data = self.plug_data[plug_id]
  219. data.last_seen = datetime.now(timezone.utc)
  220. # Process based on data type
  221. if data_type == "power":
  222. try:
  223. data.power = float(raw_value) * config.multiplier
  224. logger.debug("MQTT smart plug %s: power=%s", plug_id, data.power)
  225. except (ValueError, TypeError):
  226. pass # Ignore unparseable power reading from MQTT
  227. elif data_type == "energy":
  228. try:
  229. data.energy = float(raw_value) * config.multiplier
  230. logger.debug("MQTT smart plug %s: energy=%s", plug_id, data.energy)
  231. except (ValueError, TypeError):
  232. pass # Ignore unparseable energy reading from MQTT
  233. elif data_type == "state":
  234. state_str = str(raw_value)
  235. # Check against configured ON value if set
  236. if config.on_value:
  237. # Case-insensitive comparison
  238. if state_str.lower() == config.on_value.lower():
  239. data.state = "ON"
  240. else:
  241. data.state = "OFF"
  242. else:
  243. # Default behavior: normalize common values
  244. upper_state = state_str.upper()
  245. if upper_state in ("ON", "1", "TRUE"):
  246. data.state = "ON"
  247. elif upper_state in ("OFF", "0", "FALSE"):
  248. data.state = "OFF"
  249. else:
  250. data.state = state_str
  251. logger.debug("MQTT smart plug %s: state=%s", plug_id, data.state)
  252. def _extract_json_path(self, data: dict, path: str) -> Any:
  253. """Extract value using dot notation (e.g., 'power_l1' or 'data.power').
  254. Supports simple dot notation for nested objects.
  255. """
  256. if not path:
  257. return None
  258. parts = path.split(".")
  259. current = data
  260. for part in parts:
  261. if isinstance(current, dict) and part in current:
  262. current = current[part]
  263. else:
  264. return None
  265. return current
  266. def _resubscribe_all(self):
  267. """Resubscribe to all registered topics after reconnection."""
  268. if not self.client or not self.connected:
  269. return
  270. with self._lock:
  271. for topic in self.subscriptions:
  272. if self.subscriptions[topic]: # Only if there are subscribers
  273. try:
  274. self.client.subscribe(topic, qos=1)
  275. logger.debug("MQTT smart plug: resubscribed to %s", topic)
  276. except Exception as e:
  277. logger.error("MQTT smart plug: failed to resubscribe to %s: %s", topic, e)
  278. def subscribe(
  279. self,
  280. plug_id: int,
  281. # Power source
  282. power_topic: str | None = None,
  283. power_path: str | None = None,
  284. power_multiplier: float = 1.0,
  285. # Energy source
  286. energy_topic: str | None = None,
  287. energy_path: str | None = None,
  288. energy_multiplier: float = 1.0,
  289. # State source
  290. state_topic: str | None = None,
  291. state_path: str | None = None,
  292. state_on_value: str | None = None,
  293. # Legacy: single topic/path/multiplier (for backward compatibility)
  294. topic: str | None = None,
  295. multiplier: float = 1.0,
  296. ):
  297. """Subscribe to MQTT topics for a plug.
  298. Each data type (power, energy, state) can have its own topic.
  299. For backward compatibility, if power_topic is not set but topic is,
  300. topic will be used for all data types that have paths configured.
  301. """
  302. with self._lock:
  303. # Initialize config for this plug
  304. self.plug_configs[plug_id] = {}
  305. # Determine topics (new fields take priority, fall back to legacy)
  306. effective_power_topic = power_topic or topic
  307. effective_energy_topic = energy_topic or topic
  308. effective_state_topic = state_topic or topic
  309. # Use new multipliers or fall back to legacy
  310. effective_power_mult = power_multiplier if power_multiplier != 1.0 else multiplier
  311. effective_energy_mult = energy_multiplier if energy_multiplier != 1.0 else multiplier
  312. # Configure power subscription (path is optional - empty means use raw payload)
  313. if effective_power_topic:
  314. config = MQTTDataSourceConfig(
  315. topic=effective_power_topic,
  316. path=power_path or "",
  317. multiplier=effective_power_mult,
  318. )
  319. self.plug_configs[plug_id]["power"] = config
  320. self._add_subscription(plug_id, effective_power_topic, "power")
  321. # Configure energy subscription (path is optional - empty means use raw payload)
  322. if effective_energy_topic:
  323. config = MQTTDataSourceConfig(
  324. topic=effective_energy_topic,
  325. path=energy_path or "",
  326. multiplier=effective_energy_mult,
  327. )
  328. self.plug_configs[plug_id]["energy"] = config
  329. self._add_subscription(plug_id, effective_energy_topic, "energy")
  330. # Configure state subscription (path is optional - empty means use raw payload)
  331. if effective_state_topic:
  332. config = MQTTDataSourceConfig(
  333. topic=effective_state_topic,
  334. path=state_path or "",
  335. on_value=state_on_value,
  336. )
  337. self.plug_configs[plug_id]["state"] = config
  338. self._add_subscription(plug_id, effective_state_topic, "state")
  339. # Initialize data entry
  340. if plug_id not in self.plug_data:
  341. self.plug_data[plug_id] = SmartPlugMQTTData(plug_id=plug_id)
  342. logger.info(
  343. f"MQTT smart plug {plug_id}: configured with "
  344. f"power={effective_power_topic if power_path else None}, "
  345. f"energy={effective_energy_topic if energy_path else None}, "
  346. f"state={effective_state_topic if state_path else None}"
  347. )
  348. def _add_subscription(self, plug_id: int, topic: str, data_type: str):
  349. """Add a subscription for a plug/data_type to a topic."""
  350. if topic not in self.subscriptions:
  351. self.subscriptions[topic] = []
  352. # Actually subscribe if connected
  353. if self.client and self.connected:
  354. try:
  355. self.client.subscribe(topic, qos=1)
  356. logger.info("MQTT smart plug: subscribed to %s", topic)
  357. except Exception as e:
  358. logger.error("MQTT smart plug: failed to subscribe to %s: %s", topic, e)
  359. entry = (plug_id, data_type)
  360. if entry not in self.subscriptions[topic]:
  361. self.subscriptions[topic].append(entry)
  362. def unsubscribe(self, plug_id: int):
  363. """Unsubscribe when plug is deleted/updated."""
  364. with self._lock:
  365. # Get all configs for this plug
  366. configs = self.plug_configs.pop(plug_id, {})
  367. if not configs:
  368. # Still clean up any stray subscriptions
  369. pass
  370. # Collect all topics this plug was subscribed to
  371. topics_to_check = set()
  372. for _data_type, config in configs.items():
  373. topics_to_check.add(config.topic)
  374. # Also scan subscriptions to remove any entries for this plug
  375. for topic in list(self.subscriptions.keys()):
  376. # Remove all entries for this plug_id
  377. self.subscriptions[topic] = [(pid, dtype) for pid, dtype in self.subscriptions[topic] if pid != plug_id]
  378. topics_to_check.add(topic)
  379. # Unsubscribe from topics with no more subscribers
  380. for topic in topics_to_check:
  381. if topic in self.subscriptions and not self.subscriptions[topic]:
  382. del self.subscriptions[topic]
  383. if self.client and self.connected:
  384. try:
  385. self.client.unsubscribe(topic)
  386. logger.info("MQTT smart plug: unsubscribed from %s", topic)
  387. except Exception as e:
  388. logger.error("MQTT smart plug: failed to unsubscribe from %s: %s", topic, e)
  389. # Remove data
  390. self.plug_data.pop(plug_id, None)
  391. def get_plug_data(self, plug_id: int) -> SmartPlugMQTTData | None:
  392. """Get latest data for a plug (called by status endpoint)."""
  393. with self._lock:
  394. return self.plug_data.get(plug_id)
  395. def is_reachable(self, plug_id: int) -> bool:
  396. """Check if a plug has received data recently."""
  397. data = self.get_plug_data(plug_id)
  398. if not data:
  399. return False
  400. timeout = timedelta(minutes=self.REACHABLE_TIMEOUT_MINUTES)
  401. return datetime.now(timezone.utc) - data.last_seen < timeout
  402. async def disconnect(self, timeout: float = 0):
  403. """Disconnect from MQTT broker."""
  404. if self.client:
  405. try:
  406. self._disconnection_event = threading.Event()
  407. self.client.disconnect()
  408. await asyncio.to_thread(self._disconnection_event.wait, timeout=timeout)
  409. self.client.loop_stop()
  410. except Exception as e:
  411. logger.debug("MQTT smart plug disconnect error (ignored): %s", e)
  412. finally:
  413. self.client = None
  414. self.connected = False
  415. def subscribe_plug_to_mqtt(service: "MQTTSmartPlugService", plug: Any) -> list[str]:
  416. """Resolve per-type topic fields on a SmartPlug and register it with the service.
  417. The SmartPlug model carries both a legacy single `mqtt_topic` and newer
  418. per-type `mqtt_{power,energy,state}_topic` fields. Three code paths used
  419. to open-code this resolution (startup restore, create, update) and they
  420. drifted — the startup path skipped plugs that only had per-type topics
  421. set, leaving them unsubscribed after every restart (#1010). Funnelling
  422. all three through this helper keeps them in sync.
  423. Returns the list of topics subscribed (empty if nothing was configured).
  424. """
  425. power_topic = plug.mqtt_power_topic or plug.mqtt_topic
  426. energy_topic = plug.mqtt_energy_topic or plug.mqtt_topic
  427. state_topic = plug.mqtt_state_topic or plug.mqtt_topic
  428. if not (power_topic or energy_topic or state_topic):
  429. return []
  430. legacy_mult = plug.mqtt_multiplier or 1.0
  431. service.subscribe(
  432. plug_id=plug.id,
  433. power_topic=power_topic,
  434. power_path=plug.mqtt_power_path,
  435. power_multiplier=plug.mqtt_power_multiplier or legacy_mult,
  436. energy_topic=energy_topic,
  437. energy_path=plug.mqtt_energy_path,
  438. energy_multiplier=plug.mqtt_energy_multiplier or legacy_mult,
  439. state_topic=state_topic,
  440. state_path=plug.mqtt_state_path,
  441. state_on_value=plug.mqtt_state_on_value,
  442. )
  443. return [t for t in {power_topic, energy_topic, state_topic} if t]
  444. # Global instance
  445. mqtt_smart_plug_service = MQTTSmartPlugService()