mqtt_relay.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
  1. """MQTT Relay Service for publishing BamBuddy events to external MQTT brokers.
  2. This service enables integration with external automation systems like
  3. Node-RED, Home Assistant, and other MQTT-based platforms.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. import threading
  10. import time
  11. from datetime import datetime
  12. from typing import Any
  13. import paho.mqtt.client as mqtt
  14. logger = logging.getLogger(__name__)
  15. class MQTTRelayService:
  16. """Publishes BamBuddy events to an external MQTT broker."""
  17. # Minimum interval between status updates per printer (seconds)
  18. STATUS_THROTTLE_SECONDS = 1.0
  19. def __init__(self):
  20. self.client: mqtt.Client | None = None
  21. self.enabled = False
  22. self.connected = False
  23. self.topic_prefix = "bambuddy"
  24. self._lock = threading.Lock()
  25. self._loop: asyncio.AbstractEventLoop | None = None
  26. self._broker = ""
  27. self._port = 1883
  28. self._last_printer_status: dict[int, float] = {} # printer_id -> last publish timestamp
  29. self._smart_plug_service = None # Lazy import to avoid circular dependency
  30. self._settings: dict = {} # Store settings for smart plug service
  31. async def configure(self, settings: dict) -> bool:
  32. """Configure MQTT connection from settings.
  33. Returns True if connection was successful or MQTT is disabled.
  34. """
  35. self.enabled = settings.get("mqtt_enabled", False)
  36. self._settings = settings # Store for smart plug service
  37. if not self.enabled:
  38. await self.disconnect()
  39. # Also configure smart plug service (will disable it)
  40. await self._configure_smart_plug_service(settings)
  41. logger.info("MQTT relay disabled")
  42. return True
  43. broker = settings.get("mqtt_broker", "")
  44. port = settings.get("mqtt_port", 1883)
  45. username = settings.get("mqtt_username", "")
  46. password = settings.get("mqtt_password", "")
  47. self.topic_prefix = settings.get("mqtt_topic_prefix", "bambuddy")
  48. use_tls = settings.get("mqtt_use_tls", False)
  49. if not broker:
  50. logger.warning("MQTT enabled but no broker configured")
  51. return False
  52. # Store for status endpoint
  53. self._broker = broker
  54. self._port = port
  55. # Disconnect existing connection if settings changed
  56. if self.client:
  57. await self.disconnect()
  58. # Create and connect client
  59. result = await self._connect(broker, port, username, password, use_tls)
  60. # Configure smart plug service with same settings
  61. await self._configure_smart_plug_service(settings)
  62. return result
  63. async def _configure_smart_plug_service(self, settings: dict):
  64. """Configure the MQTT smart plug service with the same broker settings."""
  65. try:
  66. if self._smart_plug_service is None:
  67. from backend.app.services.mqtt_smart_plug import mqtt_smart_plug_service
  68. self._smart_plug_service = mqtt_smart_plug_service
  69. await self._smart_plug_service.configure(settings)
  70. except Exception as e:
  71. logger.error("Failed to configure MQTT smart plug service: %s", e)
  72. @property
  73. def smart_plug_service(self):
  74. """Get the MQTT smart plug service instance."""
  75. if self._smart_plug_service is None:
  76. from backend.app.services.mqtt_smart_plug import mqtt_smart_plug_service
  77. self._smart_plug_service = mqtt_smart_plug_service
  78. return self._smart_plug_service
  79. async def _connect(self, broker: str, port: int, username: str, password: str, use_tls: bool) -> bool:
  80. """Establish MQTT connection."""
  81. try:
  82. # Create client with callback API version 2 (use MQTTv311 for broader compatibility)
  83. self.client = mqtt.Client(
  84. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  85. client_id=f"bambuddy-{id(self)}",
  86. protocol=mqtt.MQTTv311,
  87. )
  88. # Set up callbacks
  89. self.client.on_connect = self._on_connect
  90. self.client.on_disconnect = self._on_disconnect
  91. # Configure authentication
  92. if username:
  93. self.client.username_pw_set(username, password)
  94. # Configure TLS (allow self-signed certs for testing)
  95. if use_tls:
  96. self.client.tls_set(cert_reqs=ssl.CERT_NONE)
  97. self.client.tls_insecure_set(True) # Allow self-signed certs
  98. # Run connect_async in thread pool with timeout to avoid blocking
  99. # on unreachable brokers (connect_async does synchronous socket creation)
  100. try:
  101. await asyncio.wait_for(asyncio.to_thread(self.client.connect_async, broker, port, 60), timeout=3.0)
  102. except TimeoutError:
  103. logger.warning("MQTT relay connection to %s:%s timed out", broker, port)
  104. return False
  105. self.client.loop_start()
  106. # Wait briefly for connection callback
  107. await asyncio.sleep(1.0)
  108. if self.connected:
  109. logger.info("MQTT relay connected to %s:%s", broker, port)
  110. # Publish online status
  111. self._publish_status("online")
  112. return True
  113. else:
  114. logger.warning("MQTT relay connection pending to %s:%s", broker, port)
  115. return True # Connection is async, may succeed later
  116. except Exception as e:
  117. logger.error("MQTT relay connection failed: %s", e)
  118. self.connected = False
  119. return False
  120. def _on_connect(
  121. self,
  122. client: mqtt.Client,
  123. userdata: Any,
  124. flags: dict,
  125. reason_code: int | mqtt.ReasonCode,
  126. properties: mqtt.Properties | None = None,
  127. ):
  128. """Callback when connected to broker."""
  129. # Handle both MQTTv311 (int) and MQTTv5 (ReasonCode) return codes
  130. rc = reason_code if isinstance(reason_code, int) else reason_code.value
  131. if rc == 0:
  132. self.connected = True
  133. logger.info("MQTT relay connected successfully")
  134. # Publish online status
  135. self._publish_status("online")
  136. else:
  137. self.connected = False
  138. logger.error("MQTT relay connection failed: %s", reason_code)
  139. def _on_disconnect(
  140. self,
  141. client: mqtt.Client,
  142. userdata: Any,
  143. flags_or_rc: dict | int | mqtt.ReasonCode,
  144. reason_code: int | mqtt.ReasonCode | None = None,
  145. properties: mqtt.Properties | None = None,
  146. ):
  147. """Callback when disconnected from broker."""
  148. self.connected = False
  149. # Handle both MQTTv311 (rc as 3rd param) and MQTTv5 (flags, rc, props)
  150. rc = reason_code if reason_code is not None else flags_or_rc
  151. rc_val = rc if isinstance(rc, int) else getattr(rc, "value", 0)
  152. if rc_val != 0:
  153. logger.warning("MQTT relay disconnected: %s", rc)
  154. else:
  155. logger.info("MQTT relay disconnected cleanly")
  156. async def disconnect(self):
  157. """Disconnect from MQTT broker."""
  158. if self.client:
  159. try:
  160. # Publish offline status before disconnecting
  161. self._publish_status("offline")
  162. self.client.loop_stop()
  163. self.client.disconnect()
  164. except Exception as e:
  165. logger.debug("MQTT disconnect error (ignored): %s", e)
  166. finally:
  167. self.client = None
  168. self.connected = False
  169. def _publish_status(self, status: str):
  170. """Publish BamBuddy status (online/offline)."""
  171. self._publish(
  172. f"{self.topic_prefix}/status",
  173. {"status": status, "timestamp": datetime.utcnow().isoformat()},
  174. retain=True,
  175. )
  176. def _publish(self, topic: str, payload: dict, retain: bool = False):
  177. """Publish message to MQTT broker."""
  178. if not self.client or not self.connected:
  179. return
  180. try:
  181. with self._lock:
  182. self.client.publish(topic, json.dumps(payload, default=str), qos=1, retain=retain)
  183. except Exception as e:
  184. logger.debug("MQTT publish error: %s", e)
  185. def get_status(self) -> dict:
  186. """Get current MQTT relay status for API."""
  187. return {
  188. "enabled": self.enabled,
  189. "connected": self.connected,
  190. "broker": self._broker if self.enabled else "",
  191. "port": self._port if self.enabled else 0,
  192. "topic_prefix": self.topic_prefix,
  193. }
  194. # =========================================================================
  195. # Printer Events
  196. # =========================================================================
  197. async def on_printer_status(self, printer_id: int, state: Any, printer_name: str, printer_serial: str):
  198. """Publish printer status change (throttled to 1 update/sec per printer)."""
  199. if not self.enabled or not self.connected:
  200. return
  201. # Throttle status updates to avoid flooding MQTT broker
  202. now = time.time()
  203. last_publish = self._last_printer_status.get(printer_id, 0)
  204. if now - last_publish < self.STATUS_THROTTLE_SECONDS:
  205. return # Skip this update, too soon since last publish
  206. self._last_printer_status[printer_id] = now
  207. # Build status payload from PrinterState
  208. payload = {
  209. "printer_id": printer_id,
  210. "printer_name": printer_name,
  211. "printer_serial": printer_serial,
  212. "timestamp": datetime.utcnow().isoformat(),
  213. "connected": state.connected,
  214. "state": state.state,
  215. "progress": state.progress,
  216. "remaining_time": state.remaining_time,
  217. "layer_num": state.layer_num,
  218. "total_layers": state.total_layers,
  219. "current_print": state.current_print,
  220. "subtask_name": state.subtask_name,
  221. "gcode_file": state.gcode_file,
  222. "temperatures": state.temperatures,
  223. "wifi_signal": state.wifi_signal,
  224. "chamber_light": state.chamber_light,
  225. "speed_level": state.speed_level,
  226. "cooling_fan_speed": state.cooling_fan_speed,
  227. "big_fan1_speed": state.big_fan1_speed,
  228. "big_fan2_speed": state.big_fan2_speed,
  229. "heatbreak_fan_speed": state.heatbreak_fan_speed,
  230. }
  231. self._publish(
  232. f"{self.topic_prefix}/printers/{printer_serial}/status",
  233. payload,
  234. retain=True,
  235. )
  236. async def on_printer_online(self, printer_id: int, printer_name: str, printer_serial: str):
  237. """Publish printer came online event."""
  238. if not self.enabled or not self.connected:
  239. return
  240. self._publish(
  241. f"{self.topic_prefix}/printers/{printer_serial}/online",
  242. {
  243. "printer_id": printer_id,
  244. "printer_name": printer_name,
  245. "printer_serial": printer_serial,
  246. "timestamp": datetime.utcnow().isoformat(),
  247. },
  248. )
  249. async def on_printer_offline(self, printer_id: int, printer_name: str, printer_serial: str):
  250. """Publish printer went offline event."""
  251. if not self.enabled or not self.connected:
  252. return
  253. self._publish(
  254. f"{self.topic_prefix}/printers/{printer_serial}/offline",
  255. {
  256. "printer_id": printer_id,
  257. "printer_name": printer_name,
  258. "printer_serial": printer_serial,
  259. "timestamp": datetime.utcnow().isoformat(),
  260. },
  261. )
  262. async def on_print_start(
  263. self,
  264. printer_id: int,
  265. printer_name: str,
  266. printer_serial: str,
  267. filename: str,
  268. subtask_name: str,
  269. ):
  270. """Publish print started event."""
  271. if not self.enabled or not self.connected:
  272. return
  273. self._publish(
  274. f"{self.topic_prefix}/printers/{printer_serial}/print/started",
  275. {
  276. "printer_id": printer_id,
  277. "printer_name": printer_name,
  278. "printer_serial": printer_serial,
  279. "filename": filename,
  280. "subtask_name": subtask_name,
  281. "timestamp": datetime.utcnow().isoformat(),
  282. },
  283. )
  284. async def on_print_complete(
  285. self,
  286. printer_id: int,
  287. printer_name: str,
  288. printer_serial: str,
  289. filename: str,
  290. subtask_name: str,
  291. status: str,
  292. ):
  293. """Publish print completed event."""
  294. if not self.enabled or not self.connected:
  295. return
  296. # Determine topic based on status
  297. if status == "completed":
  298. topic = f"{self.topic_prefix}/printers/{printer_serial}/print/completed"
  299. else:
  300. topic = f"{self.topic_prefix}/printers/{printer_serial}/print/failed"
  301. self._publish(
  302. topic,
  303. {
  304. "printer_id": printer_id,
  305. "printer_name": printer_name,
  306. "printer_serial": printer_serial,
  307. "filename": filename,
  308. "subtask_name": subtask_name,
  309. "status": status,
  310. "timestamp": datetime.utcnow().isoformat(),
  311. },
  312. )
  313. async def on_ams_change(
  314. self,
  315. printer_id: int,
  316. printer_name: str,
  317. printer_serial: str,
  318. ams_data: list,
  319. ):
  320. """Publish AMS filament change event."""
  321. if not self.enabled or not self.connected:
  322. return
  323. self._publish(
  324. f"{self.topic_prefix}/printers/{printer_serial}/ams/changed",
  325. {
  326. "printer_id": printer_id,
  327. "printer_name": printer_name,
  328. "printer_serial": printer_serial,
  329. "ams_units": ams_data,
  330. "timestamp": datetime.utcnow().isoformat(),
  331. },
  332. )
  333. async def on_printer_error(
  334. self,
  335. printer_id: int,
  336. printer_name: str,
  337. printer_serial: str,
  338. errors: list,
  339. ):
  340. """Publish printer HMS error event."""
  341. if not self.enabled or not self.connected:
  342. return
  343. self._publish(
  344. f"{self.topic_prefix}/printers/{printer_serial}/error",
  345. {
  346. "printer_id": printer_id,
  347. "printer_name": printer_name,
  348. "printer_serial": printer_serial,
  349. "errors": errors,
  350. "timestamp": datetime.utcnow().isoformat(),
  351. },
  352. )
  353. # =========================================================================
  354. # Print Queue Events
  355. # =========================================================================
  356. async def on_queue_job_added(
  357. self,
  358. job_id: int,
  359. filename: str,
  360. printer_id: int | None,
  361. printer_name: str | None,
  362. ):
  363. """Publish job added to queue event."""
  364. if not self.enabled or not self.connected:
  365. return
  366. self._publish(
  367. f"{self.topic_prefix}/queue/job_added",
  368. {
  369. "job_id": job_id,
  370. "filename": filename,
  371. "printer_id": printer_id,
  372. "printer_name": printer_name,
  373. "timestamp": datetime.utcnow().isoformat(),
  374. },
  375. )
  376. async def on_queue_job_started(
  377. self,
  378. job_id: int,
  379. filename: str,
  380. printer_id: int,
  381. printer_name: str,
  382. printer_serial: str,
  383. ):
  384. """Publish queued job started printing event."""
  385. if not self.enabled or not self.connected:
  386. return
  387. self._publish(
  388. f"{self.topic_prefix}/queue/job_started",
  389. {
  390. "job_id": job_id,
  391. "filename": filename,
  392. "printer_id": printer_id,
  393. "printer_name": printer_name,
  394. "printer_serial": printer_serial,
  395. "timestamp": datetime.utcnow().isoformat(),
  396. },
  397. )
  398. async def on_queue_job_completed(
  399. self,
  400. job_id: int,
  401. filename: str,
  402. printer_id: int,
  403. printer_name: str,
  404. status: str,
  405. ):
  406. """Publish queued job finished event."""
  407. if not self.enabled or not self.connected:
  408. return
  409. topic = (
  410. f"{self.topic_prefix}/queue/job_completed"
  411. if status == "completed"
  412. else f"{self.topic_prefix}/queue/job_failed"
  413. )
  414. self._publish(
  415. topic,
  416. {
  417. "job_id": job_id,
  418. "filename": filename,
  419. "printer_id": printer_id,
  420. "printer_name": printer_name,
  421. "status": status,
  422. "timestamp": datetime.utcnow().isoformat(),
  423. },
  424. )
  425. # =========================================================================
  426. # Maintenance Events
  427. # =========================================================================
  428. async def on_maintenance_alert(
  429. self,
  430. printer_id: int,
  431. printer_name: str,
  432. maintenance_type: str,
  433. current_value: float,
  434. threshold: float,
  435. ):
  436. """Publish maintenance alert triggered event."""
  437. if not self.enabled or not self.connected:
  438. return
  439. self._publish(
  440. f"{self.topic_prefix}/maintenance/alert",
  441. {
  442. "printer_id": printer_id,
  443. "printer_name": printer_name,
  444. "maintenance_type": maintenance_type,
  445. "current_value": current_value,
  446. "threshold": threshold,
  447. "timestamp": datetime.utcnow().isoformat(),
  448. },
  449. )
  450. async def on_maintenance_acknowledged(
  451. self,
  452. printer_id: int,
  453. printer_name: str,
  454. maintenance_type: str,
  455. ):
  456. """Publish maintenance alert acknowledged event."""
  457. if not self.enabled or not self.connected:
  458. return
  459. self._publish(
  460. f"{self.topic_prefix}/maintenance/acknowledged",
  461. {
  462. "printer_id": printer_id,
  463. "printer_name": printer_name,
  464. "maintenance_type": maintenance_type,
  465. "timestamp": datetime.utcnow().isoformat(),
  466. },
  467. )
  468. async def on_maintenance_reset(
  469. self,
  470. printer_id: int,
  471. printer_name: str,
  472. maintenance_type: str,
  473. ):
  474. """Publish maintenance counter reset event."""
  475. if not self.enabled or not self.connected:
  476. return
  477. self._publish(
  478. f"{self.topic_prefix}/maintenance/reset",
  479. {
  480. "printer_id": printer_id,
  481. "printer_name": printer_name,
  482. "maintenance_type": maintenance_type,
  483. "timestamp": datetime.utcnow().isoformat(),
  484. },
  485. )
  486. # =========================================================================
  487. # Archive Events
  488. # =========================================================================
  489. async def on_archive_created(
  490. self,
  491. archive_id: int,
  492. print_name: str,
  493. printer_name: str,
  494. status: str,
  495. ):
  496. """Publish print archived event."""
  497. if not self.enabled or not self.connected:
  498. return
  499. self._publish(
  500. f"{self.topic_prefix}/archive/created",
  501. {
  502. "archive_id": archive_id,
  503. "print_name": print_name,
  504. "printer_name": printer_name,
  505. "status": status,
  506. "timestamp": datetime.utcnow().isoformat(),
  507. },
  508. )
  509. async def on_archive_updated(
  510. self,
  511. archive_id: int,
  512. print_name: str,
  513. status: str,
  514. ):
  515. """Publish archive record updated event."""
  516. if not self.enabled or not self.connected:
  517. return
  518. self._publish(
  519. f"{self.topic_prefix}/archive/updated",
  520. {
  521. "archive_id": archive_id,
  522. "print_name": print_name,
  523. "status": status,
  524. "timestamp": datetime.utcnow().isoformat(),
  525. },
  526. )
  527. # =========================================================================
  528. # Filament/Spoolman Events
  529. # =========================================================================
  530. async def on_filament_low(
  531. self,
  532. spool_id: int,
  533. spool_name: str,
  534. remaining_weight: float,
  535. remaining_percent: float,
  536. ):
  537. """Publish filament inventory low event."""
  538. if not self.enabled or not self.connected:
  539. return
  540. self._publish(
  541. f"{self.topic_prefix}/filament/low",
  542. {
  543. "spool_id": spool_id,
  544. "spool_name": spool_name,
  545. "remaining_weight": remaining_weight,
  546. "remaining_percent": remaining_percent,
  547. "timestamp": datetime.utcnow().isoformat(),
  548. },
  549. )
  550. # =========================================================================
  551. # Smart Plug Events
  552. # =========================================================================
  553. async def on_smart_plug_state(
  554. self,
  555. plug_id: int,
  556. plug_name: str,
  557. state: str,
  558. printer_id: int | None = None,
  559. printer_name: str | None = None,
  560. ):
  561. """Publish smart plug state change event."""
  562. if not self.enabled or not self.connected:
  563. return
  564. topic = f"{self.topic_prefix}/smart_plugs/on" if state == "on" else f"{self.topic_prefix}/smart_plugs/off"
  565. self._publish(
  566. topic,
  567. {
  568. "plug_id": plug_id,
  569. "plug_name": plug_name,
  570. "state": state,
  571. "printer_id": printer_id,
  572. "printer_name": printer_name,
  573. "timestamp": datetime.utcnow().isoformat(),
  574. },
  575. )
  576. async def on_smart_plug_energy(
  577. self,
  578. plug_id: int,
  579. plug_name: str,
  580. power: float,
  581. energy_today: float,
  582. energy_total: float,
  583. ):
  584. """Publish smart plug energy update event."""
  585. if not self.enabled or not self.connected:
  586. return
  587. self._publish(
  588. f"{self.topic_prefix}/smart_plugs/energy",
  589. {
  590. "plug_id": plug_id,
  591. "plug_name": plug_name,
  592. "power_watts": power,
  593. "energy_today_kwh": energy_today,
  594. "energy_total_kwh": energy_total,
  595. "timestamp": datetime.utcnow().isoformat(),
  596. },
  597. )
  598. # Global instance
  599. mqtt_relay = MQTTRelayService()