mqtt_relay.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. """MQTT Relay Service for publishing BamBuddy events to external MQTT brokers.
  2. This service enables integration with external automation systems like
  3. Node-RED, Home Assistant, and other MQTT-based platforms.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. import threading
  10. import time
  11. from datetime import datetime
  12. from typing import Any
  13. import paho.mqtt.client as mqtt
  14. logger = logging.getLogger(__name__)
  15. class MQTTRelayService:
  16. """Publishes BamBuddy events to an external MQTT broker."""
  17. # Minimum interval between status updates per printer (seconds)
  18. STATUS_THROTTLE_SECONDS = 1.0
  19. def __init__(self):
  20. self.client: mqtt.Client | None = None
  21. self.enabled = False
  22. self.connected = False
  23. self.topic_prefix = "bambuddy"
  24. self._lock = threading.Lock()
  25. self._loop: asyncio.AbstractEventLoop | None = None
  26. self._broker = ""
  27. self._port = 1883
  28. self._last_printer_status: dict[int, float] = {} # printer_id -> last publish timestamp
  29. async def configure(self, settings: dict) -> bool:
  30. """Configure MQTT connection from settings.
  31. Returns True if connection was successful or MQTT is disabled.
  32. """
  33. self.enabled = settings.get("mqtt_enabled", False)
  34. if not self.enabled:
  35. await self.disconnect()
  36. logger.info("MQTT relay disabled")
  37. return True
  38. broker = settings.get("mqtt_broker", "")
  39. port = settings.get("mqtt_port", 1883)
  40. username = settings.get("mqtt_username", "")
  41. password = settings.get("mqtt_password", "")
  42. self.topic_prefix = settings.get("mqtt_topic_prefix", "bambuddy")
  43. use_tls = settings.get("mqtt_use_tls", False)
  44. if not broker:
  45. logger.warning("MQTT enabled but no broker configured")
  46. return False
  47. # Store for status endpoint
  48. self._broker = broker
  49. self._port = port
  50. # Disconnect existing connection if settings changed
  51. if self.client:
  52. await self.disconnect()
  53. # Create and connect client
  54. return await self._connect(broker, port, username, password, use_tls)
  55. async def _connect(self, broker: str, port: int, username: str, password: str, use_tls: bool) -> bool:
  56. """Establish MQTT connection."""
  57. try:
  58. # Create client with callback API version 2 (use MQTTv311 for broader compatibility)
  59. self.client = mqtt.Client(
  60. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  61. client_id=f"bambuddy-{id(self)}",
  62. protocol=mqtt.MQTTv311,
  63. )
  64. # Set up callbacks
  65. self.client.on_connect = self._on_connect
  66. self.client.on_disconnect = self._on_disconnect
  67. # Configure authentication
  68. if username:
  69. self.client.username_pw_set(username, password)
  70. # Configure TLS (allow self-signed certs for testing)
  71. if use_tls:
  72. self.client.tls_set(cert_reqs=ssl.CERT_NONE)
  73. self.client.tls_insecure_set(True) # Allow self-signed certs
  74. # Run connect_async in thread pool with timeout to avoid blocking
  75. # on unreachable brokers (connect_async does synchronous socket creation)
  76. try:
  77. await asyncio.wait_for(asyncio.to_thread(self.client.connect_async, broker, port, 60), timeout=3.0)
  78. except TimeoutError:
  79. logger.warning(f"MQTT relay connection to {broker}:{port} timed out")
  80. return False
  81. self.client.loop_start()
  82. # Wait briefly for connection callback
  83. await asyncio.sleep(1.0)
  84. if self.connected:
  85. logger.info(f"MQTT relay connected to {broker}:{port}")
  86. # Publish online status
  87. self._publish_status("online")
  88. return True
  89. else:
  90. logger.warning(f"MQTT relay connection pending to {broker}:{port}")
  91. return True # Connection is async, may succeed later
  92. except Exception as e:
  93. logger.error(f"MQTT relay connection failed: {e}")
  94. self.connected = False
  95. return False
  96. def _on_connect(
  97. self,
  98. client: mqtt.Client,
  99. userdata: Any,
  100. flags: dict,
  101. reason_code: int | mqtt.ReasonCode,
  102. properties: mqtt.Properties | None = None,
  103. ):
  104. """Callback when connected to broker."""
  105. # Handle both MQTTv311 (int) and MQTTv5 (ReasonCode) return codes
  106. rc = reason_code if isinstance(reason_code, int) else reason_code.value
  107. if rc == 0:
  108. self.connected = True
  109. logger.info("MQTT relay connected successfully")
  110. # Publish online status
  111. self._publish_status("online")
  112. else:
  113. self.connected = False
  114. logger.error(f"MQTT relay connection failed: {reason_code}")
  115. def _on_disconnect(
  116. self,
  117. client: mqtt.Client,
  118. userdata: Any,
  119. flags_or_rc: dict | int | mqtt.ReasonCode,
  120. reason_code: int | mqtt.ReasonCode | None = None,
  121. properties: mqtt.Properties | None = None,
  122. ):
  123. """Callback when disconnected from broker."""
  124. self.connected = False
  125. # Handle both MQTTv311 (rc as 3rd param) and MQTTv5 (flags, rc, props)
  126. rc = reason_code if reason_code is not None else flags_or_rc
  127. rc_val = rc if isinstance(rc, int) else getattr(rc, "value", 0)
  128. if rc_val != 0:
  129. logger.warning(f"MQTT relay disconnected: {rc}")
  130. else:
  131. logger.info("MQTT relay disconnected cleanly")
  132. async def disconnect(self):
  133. """Disconnect from MQTT broker."""
  134. if self.client:
  135. try:
  136. # Publish offline status before disconnecting
  137. self._publish_status("offline")
  138. self.client.loop_stop()
  139. self.client.disconnect()
  140. except Exception as e:
  141. logger.debug(f"MQTT disconnect error (ignored): {e}")
  142. finally:
  143. self.client = None
  144. self.connected = False
  145. def _publish_status(self, status: str):
  146. """Publish BamBuddy status (online/offline)."""
  147. self._publish(
  148. f"{self.topic_prefix}/status",
  149. {"status": status, "timestamp": datetime.utcnow().isoformat()},
  150. retain=True,
  151. )
  152. def _publish(self, topic: str, payload: dict, retain: bool = False):
  153. """Publish message to MQTT broker."""
  154. if not self.client or not self.connected:
  155. return
  156. try:
  157. with self._lock:
  158. self.client.publish(topic, json.dumps(payload, default=str), qos=1, retain=retain)
  159. except Exception as e:
  160. logger.debug(f"MQTT publish error: {e}")
  161. def get_status(self) -> dict:
  162. """Get current MQTT relay status for API."""
  163. return {
  164. "enabled": self.enabled,
  165. "connected": self.connected,
  166. "broker": self._broker if self.enabled else "",
  167. "port": self._port if self.enabled else 0,
  168. "topic_prefix": self.topic_prefix,
  169. }
  170. # =========================================================================
  171. # Printer Events
  172. # =========================================================================
  173. async def on_printer_status(self, printer_id: int, state: Any, printer_name: str, printer_serial: str):
  174. """Publish printer status change (throttled to 1 update/sec per printer)."""
  175. if not self.enabled or not self.connected:
  176. return
  177. # Throttle status updates to avoid flooding MQTT broker
  178. now = time.time()
  179. last_publish = self._last_printer_status.get(printer_id, 0)
  180. if now - last_publish < self.STATUS_THROTTLE_SECONDS:
  181. return # Skip this update, too soon since last publish
  182. self._last_printer_status[printer_id] = now
  183. # Build status payload from PrinterState
  184. payload = {
  185. "printer_id": printer_id,
  186. "printer_name": printer_name,
  187. "printer_serial": printer_serial,
  188. "timestamp": datetime.utcnow().isoformat(),
  189. "connected": state.connected,
  190. "state": state.state,
  191. "progress": state.progress,
  192. "remaining_time": state.remaining_time,
  193. "layer_num": state.layer_num,
  194. "total_layers": state.total_layers,
  195. "current_print": state.current_print,
  196. "subtask_name": state.subtask_name,
  197. "gcode_file": state.gcode_file,
  198. "temperatures": state.temperatures,
  199. "wifi_signal": state.wifi_signal,
  200. "chamber_light": state.chamber_light,
  201. "speed_level": state.speed_level,
  202. "cooling_fan_speed": state.cooling_fan_speed,
  203. "big_fan1_speed": state.big_fan1_speed,
  204. "big_fan2_speed": state.big_fan2_speed,
  205. "heatbreak_fan_speed": state.heatbreak_fan_speed,
  206. }
  207. self._publish(
  208. f"{self.topic_prefix}/printers/{printer_serial}/status",
  209. payload,
  210. retain=True,
  211. )
  212. async def on_printer_online(self, printer_id: int, printer_name: str, printer_serial: str):
  213. """Publish printer came online event."""
  214. if not self.enabled or not self.connected:
  215. return
  216. self._publish(
  217. f"{self.topic_prefix}/printers/{printer_serial}/online",
  218. {
  219. "printer_id": printer_id,
  220. "printer_name": printer_name,
  221. "printer_serial": printer_serial,
  222. "timestamp": datetime.utcnow().isoformat(),
  223. },
  224. )
  225. async def on_printer_offline(self, printer_id: int, printer_name: str, printer_serial: str):
  226. """Publish printer went offline event."""
  227. if not self.enabled or not self.connected:
  228. return
  229. self._publish(
  230. f"{self.topic_prefix}/printers/{printer_serial}/offline",
  231. {
  232. "printer_id": printer_id,
  233. "printer_name": printer_name,
  234. "printer_serial": printer_serial,
  235. "timestamp": datetime.utcnow().isoformat(),
  236. },
  237. )
  238. async def on_print_start(
  239. self,
  240. printer_id: int,
  241. printer_name: str,
  242. printer_serial: str,
  243. filename: str,
  244. subtask_name: str,
  245. ):
  246. """Publish print started event."""
  247. if not self.enabled or not self.connected:
  248. return
  249. self._publish(
  250. f"{self.topic_prefix}/printers/{printer_serial}/print/started",
  251. {
  252. "printer_id": printer_id,
  253. "printer_name": printer_name,
  254. "printer_serial": printer_serial,
  255. "filename": filename,
  256. "subtask_name": subtask_name,
  257. "timestamp": datetime.utcnow().isoformat(),
  258. },
  259. )
  260. async def on_print_complete(
  261. self,
  262. printer_id: int,
  263. printer_name: str,
  264. printer_serial: str,
  265. filename: str,
  266. subtask_name: str,
  267. status: str,
  268. ):
  269. """Publish print completed event."""
  270. if not self.enabled or not self.connected:
  271. return
  272. # Determine topic based on status
  273. if status == "completed":
  274. topic = f"{self.topic_prefix}/printers/{printer_serial}/print/completed"
  275. else:
  276. topic = f"{self.topic_prefix}/printers/{printer_serial}/print/failed"
  277. self._publish(
  278. topic,
  279. {
  280. "printer_id": printer_id,
  281. "printer_name": printer_name,
  282. "printer_serial": printer_serial,
  283. "filename": filename,
  284. "subtask_name": subtask_name,
  285. "status": status,
  286. "timestamp": datetime.utcnow().isoformat(),
  287. },
  288. )
  289. async def on_ams_change(
  290. self,
  291. printer_id: int,
  292. printer_name: str,
  293. printer_serial: str,
  294. ams_data: list,
  295. ):
  296. """Publish AMS filament change event."""
  297. if not self.enabled or not self.connected:
  298. return
  299. self._publish(
  300. f"{self.topic_prefix}/printers/{printer_serial}/ams/changed",
  301. {
  302. "printer_id": printer_id,
  303. "printer_name": printer_name,
  304. "printer_serial": printer_serial,
  305. "ams_units": ams_data,
  306. "timestamp": datetime.utcnow().isoformat(),
  307. },
  308. )
  309. async def on_printer_error(
  310. self,
  311. printer_id: int,
  312. printer_name: str,
  313. printer_serial: str,
  314. errors: list,
  315. ):
  316. """Publish printer HMS error event."""
  317. if not self.enabled or not self.connected:
  318. return
  319. self._publish(
  320. f"{self.topic_prefix}/printers/{printer_serial}/error",
  321. {
  322. "printer_id": printer_id,
  323. "printer_name": printer_name,
  324. "printer_serial": printer_serial,
  325. "errors": errors,
  326. "timestamp": datetime.utcnow().isoformat(),
  327. },
  328. )
  329. # =========================================================================
  330. # Print Queue Events
  331. # =========================================================================
  332. async def on_queue_job_added(
  333. self,
  334. job_id: int,
  335. filename: str,
  336. printer_id: int | None,
  337. printer_name: str | None,
  338. ):
  339. """Publish job added to queue event."""
  340. if not self.enabled or not self.connected:
  341. return
  342. self._publish(
  343. f"{self.topic_prefix}/queue/job_added",
  344. {
  345. "job_id": job_id,
  346. "filename": filename,
  347. "printer_id": printer_id,
  348. "printer_name": printer_name,
  349. "timestamp": datetime.utcnow().isoformat(),
  350. },
  351. )
  352. async def on_queue_job_started(
  353. self,
  354. job_id: int,
  355. filename: str,
  356. printer_id: int,
  357. printer_name: str,
  358. printer_serial: str,
  359. ):
  360. """Publish queued job started printing event."""
  361. if not self.enabled or not self.connected:
  362. return
  363. self._publish(
  364. f"{self.topic_prefix}/queue/job_started",
  365. {
  366. "job_id": job_id,
  367. "filename": filename,
  368. "printer_id": printer_id,
  369. "printer_name": printer_name,
  370. "printer_serial": printer_serial,
  371. "timestamp": datetime.utcnow().isoformat(),
  372. },
  373. )
  374. async def on_queue_job_completed(
  375. self,
  376. job_id: int,
  377. filename: str,
  378. printer_id: int,
  379. printer_name: str,
  380. status: str,
  381. ):
  382. """Publish queued job finished event."""
  383. if not self.enabled or not self.connected:
  384. return
  385. topic = (
  386. f"{self.topic_prefix}/queue/job_completed"
  387. if status == "completed"
  388. else f"{self.topic_prefix}/queue/job_failed"
  389. )
  390. self._publish(
  391. topic,
  392. {
  393. "job_id": job_id,
  394. "filename": filename,
  395. "printer_id": printer_id,
  396. "printer_name": printer_name,
  397. "status": status,
  398. "timestamp": datetime.utcnow().isoformat(),
  399. },
  400. )
  401. # =========================================================================
  402. # Maintenance Events
  403. # =========================================================================
  404. async def on_maintenance_alert(
  405. self,
  406. printer_id: int,
  407. printer_name: str,
  408. maintenance_type: str,
  409. current_value: float,
  410. threshold: float,
  411. ):
  412. """Publish maintenance alert triggered event."""
  413. if not self.enabled or not self.connected:
  414. return
  415. self._publish(
  416. f"{self.topic_prefix}/maintenance/alert",
  417. {
  418. "printer_id": printer_id,
  419. "printer_name": printer_name,
  420. "maintenance_type": maintenance_type,
  421. "current_value": current_value,
  422. "threshold": threshold,
  423. "timestamp": datetime.utcnow().isoformat(),
  424. },
  425. )
  426. async def on_maintenance_acknowledged(
  427. self,
  428. printer_id: int,
  429. printer_name: str,
  430. maintenance_type: str,
  431. ):
  432. """Publish maintenance alert acknowledged event."""
  433. if not self.enabled or not self.connected:
  434. return
  435. self._publish(
  436. f"{self.topic_prefix}/maintenance/acknowledged",
  437. {
  438. "printer_id": printer_id,
  439. "printer_name": printer_name,
  440. "maintenance_type": maintenance_type,
  441. "timestamp": datetime.utcnow().isoformat(),
  442. },
  443. )
  444. async def on_maintenance_reset(
  445. self,
  446. printer_id: int,
  447. printer_name: str,
  448. maintenance_type: str,
  449. ):
  450. """Publish maintenance counter reset event."""
  451. if not self.enabled or not self.connected:
  452. return
  453. self._publish(
  454. f"{self.topic_prefix}/maintenance/reset",
  455. {
  456. "printer_id": printer_id,
  457. "printer_name": printer_name,
  458. "maintenance_type": maintenance_type,
  459. "timestamp": datetime.utcnow().isoformat(),
  460. },
  461. )
  462. # =========================================================================
  463. # Archive Events
  464. # =========================================================================
  465. async def on_archive_created(
  466. self,
  467. archive_id: int,
  468. print_name: str,
  469. printer_name: str,
  470. status: str,
  471. ):
  472. """Publish print archived event."""
  473. if not self.enabled or not self.connected:
  474. return
  475. self._publish(
  476. f"{self.topic_prefix}/archive/created",
  477. {
  478. "archive_id": archive_id,
  479. "print_name": print_name,
  480. "printer_name": printer_name,
  481. "status": status,
  482. "timestamp": datetime.utcnow().isoformat(),
  483. },
  484. )
  485. async def on_archive_updated(
  486. self,
  487. archive_id: int,
  488. print_name: str,
  489. status: str,
  490. ):
  491. """Publish archive record updated event."""
  492. if not self.enabled or not self.connected:
  493. return
  494. self._publish(
  495. f"{self.topic_prefix}/archive/updated",
  496. {
  497. "archive_id": archive_id,
  498. "print_name": print_name,
  499. "status": status,
  500. "timestamp": datetime.utcnow().isoformat(),
  501. },
  502. )
  503. # =========================================================================
  504. # Filament/Spoolman Events
  505. # =========================================================================
  506. async def on_filament_low(
  507. self,
  508. spool_id: int,
  509. spool_name: str,
  510. remaining_weight: float,
  511. remaining_percent: float,
  512. ):
  513. """Publish filament inventory low event."""
  514. if not self.enabled or not self.connected:
  515. return
  516. self._publish(
  517. f"{self.topic_prefix}/filament/low",
  518. {
  519. "spool_id": spool_id,
  520. "spool_name": spool_name,
  521. "remaining_weight": remaining_weight,
  522. "remaining_percent": remaining_percent,
  523. "timestamp": datetime.utcnow().isoformat(),
  524. },
  525. )
  526. # =========================================================================
  527. # Smart Plug Events
  528. # =========================================================================
  529. async def on_smart_plug_state(
  530. self,
  531. plug_id: int,
  532. plug_name: str,
  533. state: str,
  534. printer_id: int | None = None,
  535. printer_name: str | None = None,
  536. ):
  537. """Publish smart plug state change event."""
  538. if not self.enabled or not self.connected:
  539. return
  540. topic = f"{self.topic_prefix}/smart_plugs/on" if state == "on" else f"{self.topic_prefix}/smart_plugs/off"
  541. self._publish(
  542. topic,
  543. {
  544. "plug_id": plug_id,
  545. "plug_name": plug_name,
  546. "state": state,
  547. "printer_id": printer_id,
  548. "printer_name": printer_name,
  549. "timestamp": datetime.utcnow().isoformat(),
  550. },
  551. )
  552. async def on_smart_plug_energy(
  553. self,
  554. plug_id: int,
  555. plug_name: str,
  556. power: float,
  557. energy_today: float,
  558. energy_total: float,
  559. ):
  560. """Publish smart plug energy update event."""
  561. if not self.enabled or not self.connected:
  562. return
  563. self._publish(
  564. f"{self.topic_prefix}/smart_plugs/energy",
  565. {
  566. "plug_id": plug_id,
  567. "plug_name": plug_name,
  568. "power_watts": power,
  569. "energy_today_kwh": energy_today,
  570. "energy_total_kwh": energy_total,
  571. "timestamp": datetime.utcnow().isoformat(),
  572. },
  573. )
  574. # Global instance
  575. mqtt_relay = MQTTRelayService()