mqtt_smart_plug.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. """MQTT Smart Plug Service for subscribing to external MQTT topics and extracting power/energy data.
  2. This service enables integration with Shelly, Zigbee2MQTT, and other MQTT-based energy monitoring devices.
  3. """
  4. import json
  5. import logging
  6. import threading
  7. from dataclasses import dataclass, field
  8. from datetime import datetime, timedelta
  9. from typing import Any
  10. import paho.mqtt.client as mqtt
  11. logger = logging.getLogger(__name__)
  12. @dataclass
  13. class SmartPlugMQTTData:
  14. """Latest data received from an MQTT smart plug."""
  15. plug_id: int
  16. power: float | None = None # Current power in watts
  17. energy: float | None = None # Energy in kWh (today)
  18. state: str | None = None # "ON" or "OFF"
  19. last_seen: datetime = field(default_factory=datetime.utcnow)
  20. @dataclass
  21. class MQTTDataSourceConfig:
  22. """Configuration for a single MQTT data source (power, energy, or state)."""
  23. topic: str
  24. path: str
  25. multiplier: float = 1.0 # For power/energy
  26. on_value: str | None = None # For state (what value means "ON")
  27. class MQTTSmartPlugService:
  28. """Subscribes to MQTT topics for smart plug energy monitoring."""
  29. # Consider plug unreachable if no message received in this time
  30. REACHABLE_TIMEOUT_MINUTES = 5
  31. def __init__(self):
  32. self.client: mqtt.Client | None = None
  33. self.connected = False
  34. self._lock = threading.Lock()
  35. # topic -> list of (plug_id, data_type) where data_type is "power", "energy", or "state"
  36. self.subscriptions: dict[str, list[tuple[int, str]]] = {}
  37. # plug_id -> {data_type: MQTTDataSourceConfig}
  38. self.plug_configs: dict[int, dict[str, MQTTDataSourceConfig]] = {}
  39. # plug_id -> latest data
  40. self.plug_data: dict[int, SmartPlugMQTTData] = {}
  41. self._configured = False
  42. self._broker = ""
  43. self._port = 1883
  44. self._username = ""
  45. self._password = ""
  46. self._use_tls = False
  47. def is_configured(self) -> bool:
  48. """Check if the MQTT service is configured and connected."""
  49. return self._configured and self.connected
  50. def has_broker_settings(self) -> bool:
  51. """Check if broker settings are available (even if not connected yet)."""
  52. return bool(self._broker)
  53. async def configure(self, settings: dict) -> bool:
  54. """Configure MQTT connection from settings.
  55. Uses the same broker settings as the MQTT relay service.
  56. Returns True if connection was successful or MQTT is disabled.
  57. """
  58. enabled = settings.get("mqtt_enabled", False)
  59. if not enabled:
  60. await self.disconnect()
  61. self._configured = False
  62. logger.debug("MQTT smart plug service disabled (MQTT relay not enabled)")
  63. return True
  64. broker = settings.get("mqtt_broker", "")
  65. port = settings.get("mqtt_port", 1883)
  66. username = settings.get("mqtt_username", "")
  67. password = settings.get("mqtt_password", "")
  68. use_tls = settings.get("mqtt_use_tls", False)
  69. if not broker:
  70. logger.warning("MQTT smart plug service: no broker configured")
  71. self._configured = False
  72. return False
  73. # Check if settings changed
  74. settings_changed = (
  75. self._broker != broker
  76. or self._port != port
  77. or self._username != username
  78. or self._password != password
  79. or self._use_tls != use_tls
  80. )
  81. self._broker = broker
  82. self._port = port
  83. self._username = username
  84. self._password = password
  85. self._use_tls = use_tls
  86. self._configured = True
  87. # Disconnect and reconnect if settings changed
  88. if settings_changed and self.client:
  89. await self.disconnect()
  90. # Connect if not already connected
  91. if not self.client or not self.connected:
  92. return await self._connect()
  93. return True
  94. async def _connect(self) -> bool:
  95. """Establish MQTT connection."""
  96. import asyncio
  97. import ssl
  98. try:
  99. # Create client with callback API version 2
  100. self.client = mqtt.Client(
  101. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  102. client_id=f"bambuddy-smartplug-{id(self)}",
  103. protocol=mqtt.MQTTv311,
  104. )
  105. # Set up callbacks
  106. self.client.on_connect = self._on_connect
  107. self.client.on_disconnect = self._on_disconnect
  108. self.client.on_message = self._on_message
  109. # Configure authentication
  110. if self._username:
  111. self.client.username_pw_set(self._username, self._password)
  112. # Configure TLS
  113. if self._use_tls:
  114. self.client.tls_set(cert_reqs=ssl.CERT_NONE)
  115. self.client.tls_insecure_set(True)
  116. # Connect with timeout
  117. try:
  118. await asyncio.wait_for(
  119. asyncio.to_thread(self.client.connect_async, self._broker, self._port, 60),
  120. timeout=3.0,
  121. )
  122. except TimeoutError:
  123. logger.warning("MQTT smart plug connection to %s:%s timed out", self._broker, self._port)
  124. return False
  125. self.client.loop_start()
  126. # Wait briefly for connection
  127. await asyncio.sleep(1.0)
  128. if self.connected:
  129. logger.info("MQTT smart plug service connected to %s:%s", self._broker, self._port)
  130. # Resubscribe to all topics
  131. self._resubscribe_all()
  132. return True
  133. else:
  134. logger.warning("MQTT smart plug connection pending to %s:%s", self._broker, self._port)
  135. return True # Connection is async
  136. except Exception as e:
  137. logger.error("MQTT smart plug connection failed: %s", e)
  138. self.connected = False
  139. return False
  140. def _on_connect(
  141. self,
  142. client: mqtt.Client,
  143. userdata: Any,
  144. flags: dict,
  145. reason_code: int | mqtt.ReasonCode,
  146. properties: mqtt.Properties | None = None,
  147. ):
  148. """Callback when connected to broker."""
  149. rc = reason_code if isinstance(reason_code, int) else reason_code.value
  150. if rc == 0:
  151. self.connected = True
  152. logger.info("MQTT smart plug service connected successfully")
  153. # Resubscribe to all topics
  154. self._resubscribe_all()
  155. else:
  156. self.connected = False
  157. logger.error("MQTT smart plug connection failed: %s", reason_code)
  158. def _on_disconnect(
  159. self,
  160. client: mqtt.Client,
  161. userdata: Any,
  162. flags_or_rc: dict | int | mqtt.ReasonCode,
  163. reason_code: int | mqtt.ReasonCode | None = None,
  164. properties: mqtt.Properties | None = None,
  165. ):
  166. """Callback when disconnected from broker."""
  167. self.connected = False
  168. rc = reason_code if reason_code is not None else flags_or_rc
  169. rc_val = rc if isinstance(rc, int) else getattr(rc, "value", 0)
  170. if rc_val != 0:
  171. logger.warning("MQTT smart plug service disconnected: %s", rc)
  172. else:
  173. logger.info("MQTT smart plug service disconnected cleanly")
  174. def _on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage):
  175. """Handle incoming MQTT message, extract data using JSON path."""
  176. topic = msg.topic
  177. with self._lock:
  178. subscriptions = self.subscriptions.get(topic, [])
  179. if not subscriptions:
  180. return
  181. # Parse JSON payload (or treat as raw value)
  182. try:
  183. payload = json.loads(msg.payload.decode("utf-8"))
  184. is_json = True
  185. except (json.JSONDecodeError, UnicodeDecodeError):
  186. # Not JSON - treat the whole payload as a raw value
  187. payload = msg.payload.decode("utf-8").strip()
  188. is_json = False
  189. # Process for each subscribed (plug_id, data_type)
  190. for plug_id, data_type in subscriptions:
  191. configs = self.plug_configs.get(plug_id, {})
  192. config = configs.get(data_type)
  193. if not config:
  194. continue
  195. # Extract value using path (or use raw payload if no path)
  196. if is_json and config.path:
  197. raw_value = self._extract_json_path(payload, config.path)
  198. elif is_json and not config.path:
  199. # JSON but no path - if it's a simple value use it, otherwise skip
  200. if isinstance(payload, (int, float, str, bool)):
  201. raw_value = payload
  202. else:
  203. # Can't use a dict/list as a value
  204. logger.debug("MQTT plug %s: JSON payload is object/array but no path configured", plug_id)
  205. continue
  206. else:
  207. # Raw value (non-JSON)
  208. raw_value = payload
  209. if raw_value is None:
  210. continue
  211. # Initialize plug data if needed
  212. if plug_id not in self.plug_data:
  213. self.plug_data[plug_id] = SmartPlugMQTTData(plug_id=plug_id)
  214. data = self.plug_data[plug_id]
  215. data.last_seen = datetime.utcnow()
  216. # Process based on data type
  217. if data_type == "power":
  218. try:
  219. data.power = float(raw_value) * config.multiplier
  220. logger.debug("MQTT smart plug %s: power=%s", plug_id, data.power)
  221. except (ValueError, TypeError):
  222. pass # Ignore unparseable power reading from MQTT
  223. elif data_type == "energy":
  224. try:
  225. data.energy = float(raw_value) * config.multiplier
  226. logger.debug("MQTT smart plug %s: energy=%s", plug_id, data.energy)
  227. except (ValueError, TypeError):
  228. pass # Ignore unparseable energy reading from MQTT
  229. elif data_type == "state":
  230. state_str = str(raw_value)
  231. # Check against configured ON value if set
  232. if config.on_value:
  233. # Case-insensitive comparison
  234. if state_str.lower() == config.on_value.lower():
  235. data.state = "ON"
  236. else:
  237. data.state = "OFF"
  238. else:
  239. # Default behavior: normalize common values
  240. upper_state = state_str.upper()
  241. if upper_state in ("ON", "1", "TRUE"):
  242. data.state = "ON"
  243. elif upper_state in ("OFF", "0", "FALSE"):
  244. data.state = "OFF"
  245. else:
  246. data.state = state_str
  247. logger.debug("MQTT smart plug %s: state=%s", plug_id, data.state)
  248. def _extract_json_path(self, data: dict, path: str) -> Any:
  249. """Extract value using dot notation (e.g., 'power_l1' or 'data.power').
  250. Supports simple dot notation for nested objects.
  251. """
  252. if not path:
  253. return None
  254. parts = path.split(".")
  255. current = data
  256. for part in parts:
  257. if isinstance(current, dict) and part in current:
  258. current = current[part]
  259. else:
  260. return None
  261. return current
  262. def _resubscribe_all(self):
  263. """Resubscribe to all registered topics after reconnection."""
  264. if not self.client or not self.connected:
  265. return
  266. with self._lock:
  267. for topic in self.subscriptions:
  268. if self.subscriptions[topic]: # Only if there are subscribers
  269. try:
  270. self.client.subscribe(topic, qos=1)
  271. logger.debug("MQTT smart plug: resubscribed to %s", topic)
  272. except Exception as e:
  273. logger.error("MQTT smart plug: failed to resubscribe to %s: %s", topic, e)
  274. def subscribe(
  275. self,
  276. plug_id: int,
  277. # Power source
  278. power_topic: str | None = None,
  279. power_path: str | None = None,
  280. power_multiplier: float = 1.0,
  281. # Energy source
  282. energy_topic: str | None = None,
  283. energy_path: str | None = None,
  284. energy_multiplier: float = 1.0,
  285. # State source
  286. state_topic: str | None = None,
  287. state_path: str | None = None,
  288. state_on_value: str | None = None,
  289. # Legacy: single topic/path/multiplier (for backward compatibility)
  290. topic: str | None = None,
  291. multiplier: float = 1.0,
  292. ):
  293. """Subscribe to MQTT topics for a plug.
  294. Each data type (power, energy, state) can have its own topic.
  295. For backward compatibility, if power_topic is not set but topic is,
  296. topic will be used for all data types that have paths configured.
  297. """
  298. with self._lock:
  299. # Initialize config for this plug
  300. self.plug_configs[plug_id] = {}
  301. # Determine topics (new fields take priority, fall back to legacy)
  302. effective_power_topic = power_topic or topic
  303. effective_energy_topic = energy_topic or topic
  304. effective_state_topic = state_topic or topic
  305. # Use new multipliers or fall back to legacy
  306. effective_power_mult = power_multiplier if power_multiplier != 1.0 else multiplier
  307. effective_energy_mult = energy_multiplier if energy_multiplier != 1.0 else multiplier
  308. # Configure power subscription (path is optional - empty means use raw payload)
  309. if effective_power_topic:
  310. config = MQTTDataSourceConfig(
  311. topic=effective_power_topic,
  312. path=power_path or "",
  313. multiplier=effective_power_mult,
  314. )
  315. self.plug_configs[plug_id]["power"] = config
  316. self._add_subscription(plug_id, effective_power_topic, "power")
  317. # Configure energy subscription (path is optional - empty means use raw payload)
  318. if effective_energy_topic:
  319. config = MQTTDataSourceConfig(
  320. topic=effective_energy_topic,
  321. path=energy_path or "",
  322. multiplier=effective_energy_mult,
  323. )
  324. self.plug_configs[plug_id]["energy"] = config
  325. self._add_subscription(plug_id, effective_energy_topic, "energy")
  326. # Configure state subscription (path is optional - empty means use raw payload)
  327. if effective_state_topic:
  328. config = MQTTDataSourceConfig(
  329. topic=effective_state_topic,
  330. path=state_path or "",
  331. on_value=state_on_value,
  332. )
  333. self.plug_configs[plug_id]["state"] = config
  334. self._add_subscription(plug_id, effective_state_topic, "state")
  335. # Initialize data entry
  336. if plug_id not in self.plug_data:
  337. self.plug_data[plug_id] = SmartPlugMQTTData(plug_id=plug_id)
  338. logger.info(
  339. f"MQTT smart plug {plug_id}: configured with "
  340. f"power={effective_power_topic if power_path else None}, "
  341. f"energy={effective_energy_topic if energy_path else None}, "
  342. f"state={effective_state_topic if state_path else None}"
  343. )
  344. def _add_subscription(self, plug_id: int, topic: str, data_type: str):
  345. """Add a subscription for a plug/data_type to a topic."""
  346. if topic not in self.subscriptions:
  347. self.subscriptions[topic] = []
  348. # Actually subscribe if connected
  349. if self.client and self.connected:
  350. try:
  351. self.client.subscribe(topic, qos=1)
  352. logger.info("MQTT smart plug: subscribed to %s", topic)
  353. except Exception as e:
  354. logger.error("MQTT smart plug: failed to subscribe to %s: %s", topic, e)
  355. entry = (plug_id, data_type)
  356. if entry not in self.subscriptions[topic]:
  357. self.subscriptions[topic].append(entry)
  358. def unsubscribe(self, plug_id: int):
  359. """Unsubscribe when plug is deleted/updated."""
  360. with self._lock:
  361. # Get all configs for this plug
  362. configs = self.plug_configs.pop(plug_id, {})
  363. if not configs:
  364. # Still clean up any stray subscriptions
  365. pass
  366. # Collect all topics this plug was subscribed to
  367. topics_to_check = set()
  368. for _data_type, config in configs.items():
  369. topics_to_check.add(config.topic)
  370. # Also scan subscriptions to remove any entries for this plug
  371. for topic in list(self.subscriptions.keys()):
  372. # Remove all entries for this plug_id
  373. self.subscriptions[topic] = [(pid, dtype) for pid, dtype in self.subscriptions[topic] if pid != plug_id]
  374. topics_to_check.add(topic)
  375. # Unsubscribe from topics with no more subscribers
  376. for topic in topics_to_check:
  377. if topic in self.subscriptions and not self.subscriptions[topic]:
  378. del self.subscriptions[topic]
  379. if self.client and self.connected:
  380. try:
  381. self.client.unsubscribe(topic)
  382. logger.info("MQTT smart plug: unsubscribed from %s", topic)
  383. except Exception as e:
  384. logger.error("MQTT smart plug: failed to unsubscribe from %s: %s", topic, e)
  385. # Remove data
  386. self.plug_data.pop(plug_id, None)
  387. def get_plug_data(self, plug_id: int) -> SmartPlugMQTTData | None:
  388. """Get latest data for a plug (called by status endpoint)."""
  389. with self._lock:
  390. return self.plug_data.get(plug_id)
  391. def is_reachable(self, plug_id: int) -> bool:
  392. """Check if a plug has received data recently."""
  393. data = self.get_plug_data(plug_id)
  394. if not data:
  395. return False
  396. timeout = timedelta(minutes=self.REACHABLE_TIMEOUT_MINUTES)
  397. return datetime.utcnow() - data.last_seen < timeout
  398. async def disconnect(self):
  399. """Disconnect from MQTT broker."""
  400. if self.client:
  401. try:
  402. self.client.loop_stop()
  403. self.client.disconnect()
  404. except Exception as e:
  405. logger.debug("MQTT smart plug disconnect error (ignored): %s", e)
  406. finally:
  407. self.client = None
  408. self.connected = False
  409. # Global instance
  410. mqtt_smart_plug_service = MQTTSmartPlugService()