mqtt_relay.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. """MQTT Relay Service for publishing BamBuddy events to external MQTT brokers.
  2. This service enables integration with external automation systems like
  3. Node-RED, Home Assistant, and other MQTT-based platforms.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. import threading
  10. import time
  11. from datetime import datetime, timezone
  12. from typing import Any
  13. import paho.mqtt.client as mqtt
  14. logger = logging.getLogger(__name__)
  15. class MQTTRelayService:
  16. """Publishes BamBuddy events to an external MQTT broker."""
  17. # Minimum interval between status updates per printer (seconds)
  18. STATUS_THROTTLE_SECONDS = 1.0
  19. def __init__(self):
  20. self.client: mqtt.Client | None = None
  21. self.enabled = False
  22. self.connected = False
  23. self.topic_prefix = "bambuddy"
  24. self._lock = threading.Lock()
  25. self._loop: asyncio.AbstractEventLoop | None = None
  26. self._broker = ""
  27. self._port = 1883
  28. self._last_printer_status: dict[int, float] = {} # printer_id -> last publish timestamp
  29. self._smart_plug_service = None # Lazy import to avoid circular dependency
  30. self._settings: dict = {} # Store settings for smart plug service
  31. self._disconnection_event: threading.Event | None = None
  32. async def configure(self, settings: dict) -> bool:
  33. """Configure MQTT connection from settings.
  34. Returns True if connection was successful or MQTT is disabled.
  35. """
  36. self.enabled = settings.get("mqtt_enabled", False)
  37. self._settings = settings # Store for smart plug service
  38. if not self.enabled:
  39. await self.disconnect()
  40. # Also configure smart plug service (will disable it)
  41. await self._configure_smart_plug_service(settings)
  42. logger.info("MQTT relay disabled")
  43. return True
  44. broker = settings.get("mqtt_broker", "")
  45. port = settings.get("mqtt_port", 1883)
  46. username = settings.get("mqtt_username", "")
  47. password = settings.get("mqtt_password", "")
  48. self.topic_prefix = settings.get("mqtt_topic_prefix", "bambuddy")
  49. use_tls = settings.get("mqtt_use_tls", False)
  50. if not broker:
  51. logger.warning("MQTT enabled but no broker configured")
  52. return False
  53. # Store for status endpoint
  54. self._broker = broker
  55. self._port = port
  56. # Disconnect existing connection if settings changed
  57. if self.client:
  58. await self.disconnect()
  59. # Create and connect client
  60. result = await self._connect(broker, port, username, password, use_tls)
  61. # Configure smart plug service with same settings
  62. await self._configure_smart_plug_service(settings)
  63. return result
  64. async def _configure_smart_plug_service(self, settings: dict):
  65. """Configure the MQTT smart plug service with the same broker settings."""
  66. try:
  67. if self._smart_plug_service is None:
  68. from backend.app.services.mqtt_smart_plug import mqtt_smart_plug_service
  69. self._smart_plug_service = mqtt_smart_plug_service
  70. await self._smart_plug_service.configure(settings)
  71. except Exception as e:
  72. logger.error("Failed to configure MQTT smart plug service: %s", e)
  73. @property
  74. def smart_plug_service(self):
  75. """Get the MQTT smart plug service instance."""
  76. if self._smart_plug_service is None:
  77. from backend.app.services.mqtt_smart_plug import mqtt_smart_plug_service
  78. self._smart_plug_service = mqtt_smart_plug_service
  79. return self._smart_plug_service
  80. async def _connect(self, broker: str, port: int, username: str, password: str, use_tls: bool) -> bool:
  81. """Establish MQTT connection."""
  82. try:
  83. # Create client with callback API version 2 (use MQTTv311 for broader compatibility)
  84. self.client = mqtt.Client(
  85. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  86. client_id=f"bambuddy-{id(self)}",
  87. protocol=mqtt.MQTTv311,
  88. )
  89. # Set up callbacks
  90. self.client.on_connect = self._on_connect
  91. self.client.on_disconnect = self._on_disconnect
  92. # Configure authentication
  93. if username:
  94. self.client.username_pw_set(username, password)
  95. # Configure TLS (allow self-signed certs for testing)
  96. if use_tls:
  97. self.client.tls_set(cert_reqs=ssl.CERT_NONE)
  98. self.client.tls_insecure_set(True) # Allow self-signed certs
  99. # Run connect_async in thread pool with timeout to avoid blocking
  100. # on unreachable brokers (connect_async does synchronous socket creation)
  101. try:
  102. await asyncio.wait_for(asyncio.to_thread(self.client.connect_async, broker, port, 60), timeout=3.0)
  103. except TimeoutError:
  104. logger.warning("MQTT relay connection to %s:%s timed out", broker, port)
  105. return False
  106. self.client.loop_start()
  107. # Wait briefly for connection callback
  108. await asyncio.sleep(1.0)
  109. if self.connected:
  110. logger.info("MQTT relay connected to %s:%s", broker, port)
  111. # Publish online status
  112. self._publish_status("online")
  113. return True
  114. else:
  115. logger.warning("MQTT relay connection pending to %s:%s", broker, port)
  116. return True # Connection is async, may succeed later
  117. except Exception as e:
  118. logger.error("MQTT relay connection failed: %s", e)
  119. self.connected = False
  120. return False
  121. def _on_connect(
  122. self,
  123. client: mqtt.Client,
  124. userdata: Any,
  125. flags: dict,
  126. reason_code: int | mqtt.ReasonCode,
  127. properties: mqtt.Properties | None = None,
  128. ):
  129. """Callback when connected to broker."""
  130. # Handle both MQTTv311 (int) and MQTTv5 (ReasonCode) return codes
  131. rc = reason_code if isinstance(reason_code, int) else reason_code.value
  132. if rc == 0:
  133. self.connected = True
  134. logger.info("MQTT relay connected successfully")
  135. # Publish online status
  136. self._publish_status("online")
  137. else:
  138. self.connected = False
  139. logger.error("MQTT relay connection failed: %s", reason_code)
  140. def _on_disconnect(
  141. self,
  142. client: mqtt.Client,
  143. userdata: Any,
  144. flags_or_rc: dict | int | mqtt.ReasonCode,
  145. reason_code: int | mqtt.ReasonCode | None = None,
  146. properties: mqtt.Properties | None = None,
  147. ):
  148. """Callback when disconnected from broker."""
  149. self.connected = False
  150. # Handle both MQTTv311 (rc as 3rd param) and MQTTv5 (flags, rc, props)
  151. rc = reason_code if reason_code is not None else flags_or_rc
  152. rc_val = rc if isinstance(rc, int) else getattr(rc, "value", 0)
  153. if rc_val != 0:
  154. logger.warning("MQTT relay disconnected: %s", rc)
  155. else:
  156. logger.info("MQTT relay disconnected cleanly")
  157. if self._disconnection_event:
  158. self._disconnection_event.set()
  159. async def disconnect(self, timeout: float = 0):
  160. """Disconnect from MQTT broker."""
  161. if self.client:
  162. try:
  163. # Publish offline status before disconnecting
  164. self._publish_status("offline")
  165. self._disconnection_event = threading.Event()
  166. self.client.disconnect()
  167. await asyncio.to_thread(self._disconnection_event.wait, timeout=timeout)
  168. self.client.loop_stop()
  169. except Exception as e:
  170. logger.debug("MQTT disconnect error (ignored): %s", e)
  171. finally:
  172. self.client = None
  173. self.connected = False
  174. def _publish_status(self, status: str):
  175. """Publish BamBuddy status (online/offline)."""
  176. self._publish(
  177. f"{self.topic_prefix}/status",
  178. {"status": status, "timestamp": datetime.now(timezone.utc).isoformat()},
  179. retain=True,
  180. )
  181. def _publish(self, topic: str, payload: dict, retain: bool = False):
  182. """Publish message to MQTT broker."""
  183. if not self.client or not self.connected:
  184. return
  185. try:
  186. with self._lock:
  187. self.client.publish(topic, json.dumps(payload, default=str), qos=1, retain=retain)
  188. except Exception as e:
  189. logger.debug("MQTT publish error: %s", e)
  190. def get_status(self) -> dict:
  191. """Get current MQTT relay status for API."""
  192. return {
  193. "enabled": self.enabled,
  194. "connected": self.connected,
  195. "broker": self._broker if self.enabled else "",
  196. "port": self._port if self.enabled else 0,
  197. "topic_prefix": self.topic_prefix,
  198. }
  199. # =========================================================================
  200. # Printer Events
  201. # =========================================================================
  202. async def on_printer_status(self, printer_id: int, state: Any, printer_name: str, printer_serial: str):
  203. """Publish printer status change (throttled to 1 update/sec per printer)."""
  204. if not self.enabled or not self.connected:
  205. return
  206. # Throttle status updates to avoid flooding MQTT broker
  207. now = time.time()
  208. last_publish = self._last_printer_status.get(printer_id, 0)
  209. if now - last_publish < self.STATUS_THROTTLE_SECONDS:
  210. return # Skip this update, too soon since last publish
  211. self._last_printer_status[printer_id] = now
  212. # Build status payload from PrinterState
  213. payload = {
  214. "printer_id": printer_id,
  215. "printer_name": printer_name,
  216. "printer_serial": printer_serial,
  217. "timestamp": datetime.now(timezone.utc).isoformat(),
  218. "connected": state.connected,
  219. "state": state.state,
  220. "progress": state.progress,
  221. "remaining_time": state.remaining_time,
  222. "layer_num": state.layer_num,
  223. "total_layers": state.total_layers,
  224. "current_print": state.current_print,
  225. "subtask_name": state.subtask_name,
  226. "gcode_file": state.gcode_file,
  227. "temperatures": state.temperatures,
  228. "wifi_signal": state.wifi_signal,
  229. "chamber_light": state.chamber_light,
  230. "speed_level": state.speed_level,
  231. "cooling_fan_speed": state.cooling_fan_speed,
  232. "big_fan1_speed": state.big_fan1_speed,
  233. "big_fan2_speed": state.big_fan2_speed,
  234. "heatbreak_fan_speed": state.heatbreak_fan_speed,
  235. }
  236. self._publish(
  237. f"{self.topic_prefix}/printers/{printer_serial}/status",
  238. payload,
  239. retain=True,
  240. )
  241. async def on_printer_online(self, printer_id: int, printer_name: str, printer_serial: str):
  242. """Publish printer came online event."""
  243. if not self.enabled or not self.connected:
  244. return
  245. self._publish(
  246. f"{self.topic_prefix}/printers/{printer_serial}/online",
  247. {
  248. "printer_id": printer_id,
  249. "printer_name": printer_name,
  250. "printer_serial": printer_serial,
  251. "timestamp": datetime.now(timezone.utc).isoformat(),
  252. },
  253. )
  254. async def on_printer_offline(self, printer_id: int, printer_name: str, printer_serial: str):
  255. """Publish printer went offline event."""
  256. if not self.enabled or not self.connected:
  257. return
  258. self._publish(
  259. f"{self.topic_prefix}/printers/{printer_serial}/offline",
  260. {
  261. "printer_id": printer_id,
  262. "printer_name": printer_name,
  263. "printer_serial": printer_serial,
  264. "timestamp": datetime.now(timezone.utc).isoformat(),
  265. },
  266. )
  267. async def on_print_start(
  268. self,
  269. printer_id: int,
  270. printer_name: str,
  271. printer_serial: str,
  272. filename: str,
  273. subtask_name: str,
  274. ):
  275. """Publish print started event."""
  276. if not self.enabled or not self.connected:
  277. return
  278. self._publish(
  279. f"{self.topic_prefix}/printers/{printer_serial}/print/started",
  280. {
  281. "printer_id": printer_id,
  282. "printer_name": printer_name,
  283. "printer_serial": printer_serial,
  284. "filename": filename,
  285. "subtask_name": subtask_name,
  286. "timestamp": datetime.now(timezone.utc).isoformat(),
  287. },
  288. )
  289. async def on_print_complete(
  290. self,
  291. printer_id: int,
  292. printer_name: str,
  293. printer_serial: str,
  294. filename: str,
  295. subtask_name: str,
  296. status: str,
  297. ):
  298. """Publish print completed event."""
  299. if not self.enabled or not self.connected:
  300. return
  301. # Determine topic based on status
  302. if status == "completed":
  303. topic = f"{self.topic_prefix}/printers/{printer_serial}/print/completed"
  304. else:
  305. topic = f"{self.topic_prefix}/printers/{printer_serial}/print/failed"
  306. self._publish(
  307. topic,
  308. {
  309. "printer_id": printer_id,
  310. "printer_name": printer_name,
  311. "printer_serial": printer_serial,
  312. "filename": filename,
  313. "subtask_name": subtask_name,
  314. "status": status,
  315. "timestamp": datetime.now(timezone.utc).isoformat(),
  316. },
  317. )
  318. async def on_ams_change(
  319. self,
  320. printer_id: int,
  321. printer_name: str,
  322. printer_serial: str,
  323. ams_data: list,
  324. ):
  325. """Publish AMS filament change event."""
  326. if not self.enabled or not self.connected:
  327. return
  328. self._publish(
  329. f"{self.topic_prefix}/printers/{printer_serial}/ams/changed",
  330. {
  331. "printer_id": printer_id,
  332. "printer_name": printer_name,
  333. "printer_serial": printer_serial,
  334. "ams_units": ams_data,
  335. "timestamp": datetime.now(timezone.utc).isoformat(),
  336. },
  337. )
  338. async def on_printer_error(
  339. self,
  340. printer_id: int,
  341. printer_name: str,
  342. printer_serial: str,
  343. errors: list,
  344. ):
  345. """Publish printer HMS error event."""
  346. if not self.enabled or not self.connected:
  347. return
  348. self._publish(
  349. f"{self.topic_prefix}/printers/{printer_serial}/error",
  350. {
  351. "printer_id": printer_id,
  352. "printer_name": printer_name,
  353. "printer_serial": printer_serial,
  354. "errors": errors,
  355. "timestamp": datetime.now(timezone.utc).isoformat(),
  356. },
  357. )
  358. # =========================================================================
  359. # Print Queue Events
  360. # =========================================================================
  361. async def on_queue_job_added(
  362. self,
  363. job_id: int,
  364. filename: str,
  365. printer_id: int | None,
  366. printer_name: str | None,
  367. ):
  368. """Publish job added to queue event."""
  369. if not self.enabled or not self.connected:
  370. return
  371. self._publish(
  372. f"{self.topic_prefix}/queue/job_added",
  373. {
  374. "job_id": job_id,
  375. "filename": filename,
  376. "printer_id": printer_id,
  377. "printer_name": printer_name,
  378. "timestamp": datetime.now(timezone.utc).isoformat(),
  379. },
  380. )
  381. async def on_queue_job_started(
  382. self,
  383. job_id: int,
  384. filename: str,
  385. printer_id: int,
  386. printer_name: str,
  387. printer_serial: str,
  388. ):
  389. """Publish queued job started printing event."""
  390. if not self.enabled or not self.connected:
  391. return
  392. self._publish(
  393. f"{self.topic_prefix}/queue/job_started",
  394. {
  395. "job_id": job_id,
  396. "filename": filename,
  397. "printer_id": printer_id,
  398. "printer_name": printer_name,
  399. "printer_serial": printer_serial,
  400. "timestamp": datetime.now(timezone.utc).isoformat(),
  401. },
  402. )
  403. async def on_queue_job_completed(
  404. self,
  405. job_id: int,
  406. filename: str,
  407. printer_id: int,
  408. printer_name: str,
  409. status: str,
  410. ):
  411. """Publish queued job finished event."""
  412. if not self.enabled or not self.connected:
  413. return
  414. topic = (
  415. f"{self.topic_prefix}/queue/job_completed"
  416. if status == "completed"
  417. else f"{self.topic_prefix}/queue/job_failed"
  418. )
  419. self._publish(
  420. topic,
  421. {
  422. "job_id": job_id,
  423. "filename": filename,
  424. "printer_id": printer_id,
  425. "printer_name": printer_name,
  426. "status": status,
  427. "timestamp": datetime.now(timezone.utc).isoformat(),
  428. },
  429. )
  430. # =========================================================================
  431. # Maintenance Events
  432. # =========================================================================
  433. async def on_maintenance_alert(
  434. self,
  435. printer_id: int,
  436. printer_name: str,
  437. maintenance_type: str,
  438. current_value: float,
  439. threshold: float,
  440. ):
  441. """Publish maintenance alert triggered event."""
  442. if not self.enabled or not self.connected:
  443. return
  444. self._publish(
  445. f"{self.topic_prefix}/maintenance/alert",
  446. {
  447. "printer_id": printer_id,
  448. "printer_name": printer_name,
  449. "maintenance_type": maintenance_type,
  450. "current_value": current_value,
  451. "threshold": threshold,
  452. "timestamp": datetime.now(timezone.utc).isoformat(),
  453. },
  454. )
  455. async def on_maintenance_acknowledged(
  456. self,
  457. printer_id: int,
  458. printer_name: str,
  459. maintenance_type: str,
  460. ):
  461. """Publish maintenance alert acknowledged event."""
  462. if not self.enabled or not self.connected:
  463. return
  464. self._publish(
  465. f"{self.topic_prefix}/maintenance/acknowledged",
  466. {
  467. "printer_id": printer_id,
  468. "printer_name": printer_name,
  469. "maintenance_type": maintenance_type,
  470. "timestamp": datetime.now(timezone.utc).isoformat(),
  471. },
  472. )
  473. async def on_maintenance_reset(
  474. self,
  475. printer_id: int,
  476. printer_name: str,
  477. maintenance_type: str,
  478. ):
  479. """Publish maintenance counter reset event."""
  480. if not self.enabled or not self.connected:
  481. return
  482. self._publish(
  483. f"{self.topic_prefix}/maintenance/reset",
  484. {
  485. "printer_id": printer_id,
  486. "printer_name": printer_name,
  487. "maintenance_type": maintenance_type,
  488. "timestamp": datetime.now(timezone.utc).isoformat(),
  489. },
  490. )
  491. # =========================================================================
  492. # Archive Events
  493. # =========================================================================
  494. async def on_archive_created(
  495. self,
  496. archive_id: int,
  497. print_name: str,
  498. printer_name: str,
  499. status: str,
  500. ):
  501. """Publish print archived event."""
  502. if not self.enabled or not self.connected:
  503. return
  504. self._publish(
  505. f"{self.topic_prefix}/archive/created",
  506. {
  507. "archive_id": archive_id,
  508. "print_name": print_name,
  509. "printer_name": printer_name,
  510. "status": status,
  511. "timestamp": datetime.now(timezone.utc).isoformat(),
  512. },
  513. )
  514. async def on_archive_updated(
  515. self,
  516. archive_id: int,
  517. print_name: str,
  518. status: str,
  519. ):
  520. """Publish archive record updated event."""
  521. if not self.enabled or not self.connected:
  522. return
  523. self._publish(
  524. f"{self.topic_prefix}/archive/updated",
  525. {
  526. "archive_id": archive_id,
  527. "print_name": print_name,
  528. "status": status,
  529. "timestamp": datetime.now(timezone.utc).isoformat(),
  530. },
  531. )
  532. # =========================================================================
  533. # Filament/Spoolman Events
  534. # =========================================================================
  535. async def on_filament_low(
  536. self,
  537. spool_id: int,
  538. spool_name: str,
  539. remaining_weight: float,
  540. remaining_percent: float,
  541. ):
  542. """Publish filament inventory low event."""
  543. if not self.enabled or not self.connected:
  544. return
  545. self._publish(
  546. f"{self.topic_prefix}/filament/low",
  547. {
  548. "spool_id": spool_id,
  549. "spool_name": spool_name,
  550. "remaining_weight": remaining_weight,
  551. "remaining_percent": remaining_percent,
  552. "timestamp": datetime.now(timezone.utc).isoformat(),
  553. },
  554. )
  555. # =========================================================================
  556. # Smart Plug Events
  557. # =========================================================================
  558. async def on_smart_plug_state(
  559. self,
  560. plug_id: int,
  561. plug_name: str,
  562. state: str,
  563. printer_id: int | None = None,
  564. printer_name: str | None = None,
  565. ):
  566. """Publish smart plug state change event."""
  567. if not self.enabled or not self.connected:
  568. return
  569. topic = f"{self.topic_prefix}/smart_plugs/on" if state == "on" else f"{self.topic_prefix}/smart_plugs/off"
  570. self._publish(
  571. topic,
  572. {
  573. "plug_id": plug_id,
  574. "plug_name": plug_name,
  575. "state": state,
  576. "printer_id": printer_id,
  577. "printer_name": printer_name,
  578. "timestamp": datetime.now(timezone.utc).isoformat(),
  579. },
  580. )
  581. async def on_smart_plug_energy(
  582. self,
  583. plug_id: int,
  584. plug_name: str,
  585. power: float,
  586. energy_today: float,
  587. energy_total: float,
  588. ):
  589. """Publish smart plug energy update event."""
  590. if not self.enabled or not self.connected:
  591. return
  592. self._publish(
  593. f"{self.topic_prefix}/smart_plugs/energy",
  594. {
  595. "plug_id": plug_id,
  596. "plug_name": plug_name,
  597. "power_watts": power,
  598. "energy_today_kwh": energy_today,
  599. "energy_total_kwh": energy_total,
  600. "timestamp": datetime.now(timezone.utc).isoformat(),
  601. },
  602. )
  603. # Global instance
  604. mqtt_relay = MQTTRelayService()