mqtt_relay.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. """MQTT Relay Service for publishing BamBuddy events to external MQTT brokers.
  2. This service enables integration with external automation systems like
  3. Node-RED, Home Assistant, and other MQTT-based platforms.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. import threading
  10. import time
  11. from datetime import datetime
  12. from typing import Any
  13. import paho.mqtt.client as mqtt
  14. logger = logging.getLogger(__name__)
  15. class MQTTRelayService:
  16. """Publishes BamBuddy events to an external MQTT broker."""
  17. # Minimum interval between status updates per printer (seconds)
  18. STATUS_THROTTLE_SECONDS = 1.0
  19. def __init__(self):
  20. self.client: mqtt.Client | None = None
  21. self.enabled = False
  22. self.connected = False
  23. self.topic_prefix = "bambuddy"
  24. self._lock = threading.Lock()
  25. self._loop: asyncio.AbstractEventLoop | None = None
  26. self._broker = ""
  27. self._port = 1883
  28. self._last_printer_status: dict[int, float] = {} # printer_id -> last publish timestamp
  29. async def configure(self, settings: dict) -> bool:
  30. """Configure MQTT connection from settings.
  31. Returns True if connection was successful or MQTT is disabled.
  32. """
  33. self.enabled = settings.get("mqtt_enabled", False)
  34. if not self.enabled:
  35. await self.disconnect()
  36. logger.info("MQTT relay disabled")
  37. return True
  38. broker = settings.get("mqtt_broker", "")
  39. port = settings.get("mqtt_port", 1883)
  40. username = settings.get("mqtt_username", "")
  41. password = settings.get("mqtt_password", "")
  42. self.topic_prefix = settings.get("mqtt_topic_prefix", "bambuddy")
  43. use_tls = settings.get("mqtt_use_tls", False)
  44. if not broker:
  45. logger.warning("MQTT enabled but no broker configured")
  46. return False
  47. # Store for status endpoint
  48. self._broker = broker
  49. self._port = port
  50. # Disconnect existing connection if settings changed
  51. if self.client:
  52. await self.disconnect()
  53. # Create and connect client
  54. return await self._connect(broker, port, username, password, use_tls)
  55. async def _connect(self, broker: str, port: int, username: str, password: str, use_tls: bool) -> bool:
  56. """Establish MQTT connection."""
  57. try:
  58. # Create client with callback API version 2 (use MQTTv311 for broader compatibility)
  59. self.client = mqtt.Client(
  60. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  61. client_id=f"bambuddy-{id(self)}",
  62. protocol=mqtt.MQTTv311,
  63. )
  64. # Set up callbacks
  65. self.client.on_connect = self._on_connect
  66. self.client.on_disconnect = self._on_disconnect
  67. # Configure authentication
  68. if username:
  69. self.client.username_pw_set(username, password)
  70. # Configure TLS (allow self-signed certs for testing)
  71. if use_tls:
  72. self.client.tls_set(cert_reqs=ssl.CERT_NONE)
  73. self.client.tls_insecure_set(True) # Allow self-signed certs
  74. # Connect (non-blocking with loop_start)
  75. self.client.connect_async(broker, port, keepalive=60)
  76. self.client.loop_start()
  77. # Wait briefly for connection
  78. await asyncio.sleep(1.0)
  79. if self.connected:
  80. logger.info(f"MQTT relay connected to {broker}:{port}")
  81. # Publish online status
  82. self._publish_status("online")
  83. return True
  84. else:
  85. logger.warning(f"MQTT relay connection pending to {broker}:{port}")
  86. return True # Connection is async, may succeed later
  87. except Exception as e:
  88. logger.error(f"MQTT relay connection failed: {e}")
  89. self.connected = False
  90. return False
  91. def _on_connect(
  92. self,
  93. client: mqtt.Client,
  94. userdata: Any,
  95. flags: dict,
  96. reason_code: int | mqtt.ReasonCode,
  97. properties: mqtt.Properties | None = None,
  98. ):
  99. """Callback when connected to broker."""
  100. # Handle both MQTTv311 (int) and MQTTv5 (ReasonCode) return codes
  101. rc = reason_code if isinstance(reason_code, int) else reason_code.value
  102. if rc == 0:
  103. self.connected = True
  104. logger.info("MQTT relay connected successfully")
  105. # Publish online status
  106. self._publish_status("online")
  107. else:
  108. self.connected = False
  109. logger.error(f"MQTT relay connection failed: {reason_code}")
  110. def _on_disconnect(
  111. self,
  112. client: mqtt.Client,
  113. userdata: Any,
  114. flags_or_rc: dict | int | mqtt.ReasonCode,
  115. reason_code: int | mqtt.ReasonCode | None = None,
  116. properties: mqtt.Properties | None = None,
  117. ):
  118. """Callback when disconnected from broker."""
  119. self.connected = False
  120. # Handle both MQTTv311 (rc as 3rd param) and MQTTv5 (flags, rc, props)
  121. rc = reason_code if reason_code is not None else flags_or_rc
  122. rc_val = rc if isinstance(rc, int) else getattr(rc, "value", 0)
  123. if rc_val != 0:
  124. logger.warning(f"MQTT relay disconnected: {rc}")
  125. else:
  126. logger.info("MQTT relay disconnected cleanly")
  127. async def disconnect(self):
  128. """Disconnect from MQTT broker."""
  129. if self.client:
  130. try:
  131. # Publish offline status before disconnecting
  132. self._publish_status("offline")
  133. self.client.loop_stop()
  134. self.client.disconnect()
  135. except Exception as e:
  136. logger.debug(f"MQTT disconnect error (ignored): {e}")
  137. finally:
  138. self.client = None
  139. self.connected = False
  140. def _publish_status(self, status: str):
  141. """Publish BamBuddy status (online/offline)."""
  142. self._publish(
  143. f"{self.topic_prefix}/status",
  144. {"status": status, "timestamp": datetime.utcnow().isoformat()},
  145. retain=True,
  146. )
  147. def _publish(self, topic: str, payload: dict, retain: bool = False):
  148. """Publish message to MQTT broker."""
  149. if not self.client or not self.connected:
  150. return
  151. try:
  152. with self._lock:
  153. self.client.publish(topic, json.dumps(payload, default=str), qos=1, retain=retain)
  154. except Exception as e:
  155. logger.debug(f"MQTT publish error: {e}")
  156. def get_status(self) -> dict:
  157. """Get current MQTT relay status for API."""
  158. return {
  159. "enabled": self.enabled,
  160. "connected": self.connected,
  161. "broker": self._broker if self.enabled else "",
  162. "port": self._port if self.enabled else 0,
  163. "topic_prefix": self.topic_prefix,
  164. }
  165. # =========================================================================
  166. # Printer Events
  167. # =========================================================================
  168. async def on_printer_status(self, printer_id: int, state: Any, printer_name: str, printer_serial: str):
  169. """Publish printer status change (throttled to 1 update/sec per printer)."""
  170. if not self.enabled or not self.connected:
  171. return
  172. # Throttle status updates to avoid flooding MQTT broker
  173. now = time.time()
  174. last_publish = self._last_printer_status.get(printer_id, 0)
  175. if now - last_publish < self.STATUS_THROTTLE_SECONDS:
  176. return # Skip this update, too soon since last publish
  177. self._last_printer_status[printer_id] = now
  178. # Build status payload from PrinterState
  179. payload = {
  180. "printer_id": printer_id,
  181. "printer_name": printer_name,
  182. "printer_serial": printer_serial,
  183. "timestamp": datetime.utcnow().isoformat(),
  184. "connected": state.connected,
  185. "state": state.state,
  186. "progress": state.progress,
  187. "remaining_time": state.remaining_time,
  188. "layer_num": state.layer_num,
  189. "total_layers": state.total_layers,
  190. "current_print": state.current_print,
  191. "subtask_name": state.subtask_name,
  192. "gcode_file": state.gcode_file,
  193. "temperatures": state.temperatures,
  194. "wifi_signal": state.wifi_signal,
  195. "chamber_light": state.chamber_light,
  196. "speed_level": state.speed_level,
  197. "cooling_fan_speed": state.cooling_fan_speed,
  198. "big_fan1_speed": state.big_fan1_speed,
  199. "big_fan2_speed": state.big_fan2_speed,
  200. "heatbreak_fan_speed": state.heatbreak_fan_speed,
  201. }
  202. self._publish(
  203. f"{self.topic_prefix}/printers/{printer_serial}/status",
  204. payload,
  205. retain=True,
  206. )
  207. async def on_printer_online(self, printer_id: int, printer_name: str, printer_serial: str):
  208. """Publish printer came online event."""
  209. if not self.enabled or not self.connected:
  210. return
  211. self._publish(
  212. f"{self.topic_prefix}/printers/{printer_serial}/online",
  213. {
  214. "printer_id": printer_id,
  215. "printer_name": printer_name,
  216. "printer_serial": printer_serial,
  217. "timestamp": datetime.utcnow().isoformat(),
  218. },
  219. )
  220. async def on_printer_offline(self, printer_id: int, printer_name: str, printer_serial: str):
  221. """Publish printer went offline event."""
  222. if not self.enabled or not self.connected:
  223. return
  224. self._publish(
  225. f"{self.topic_prefix}/printers/{printer_serial}/offline",
  226. {
  227. "printer_id": printer_id,
  228. "printer_name": printer_name,
  229. "printer_serial": printer_serial,
  230. "timestamp": datetime.utcnow().isoformat(),
  231. },
  232. )
  233. async def on_print_start(
  234. self,
  235. printer_id: int,
  236. printer_name: str,
  237. printer_serial: str,
  238. filename: str,
  239. subtask_name: str,
  240. ):
  241. """Publish print started event."""
  242. if not self.enabled or not self.connected:
  243. return
  244. self._publish(
  245. f"{self.topic_prefix}/printers/{printer_serial}/print/started",
  246. {
  247. "printer_id": printer_id,
  248. "printer_name": printer_name,
  249. "printer_serial": printer_serial,
  250. "filename": filename,
  251. "subtask_name": subtask_name,
  252. "timestamp": datetime.utcnow().isoformat(),
  253. },
  254. )
  255. async def on_print_complete(
  256. self,
  257. printer_id: int,
  258. printer_name: str,
  259. printer_serial: str,
  260. filename: str,
  261. subtask_name: str,
  262. status: str,
  263. ):
  264. """Publish print completed event."""
  265. if not self.enabled or not self.connected:
  266. return
  267. # Determine topic based on status
  268. if status == "completed":
  269. topic = f"{self.topic_prefix}/printers/{printer_serial}/print/completed"
  270. else:
  271. topic = f"{self.topic_prefix}/printers/{printer_serial}/print/failed"
  272. self._publish(
  273. topic,
  274. {
  275. "printer_id": printer_id,
  276. "printer_name": printer_name,
  277. "printer_serial": printer_serial,
  278. "filename": filename,
  279. "subtask_name": subtask_name,
  280. "status": status,
  281. "timestamp": datetime.utcnow().isoformat(),
  282. },
  283. )
  284. async def on_ams_change(
  285. self,
  286. printer_id: int,
  287. printer_name: str,
  288. printer_serial: str,
  289. ams_data: list,
  290. ):
  291. """Publish AMS filament change event."""
  292. if not self.enabled or not self.connected:
  293. return
  294. self._publish(
  295. f"{self.topic_prefix}/printers/{printer_serial}/ams/changed",
  296. {
  297. "printer_id": printer_id,
  298. "printer_name": printer_name,
  299. "printer_serial": printer_serial,
  300. "ams_units": ams_data,
  301. "timestamp": datetime.utcnow().isoformat(),
  302. },
  303. )
  304. async def on_printer_error(
  305. self,
  306. printer_id: int,
  307. printer_name: str,
  308. printer_serial: str,
  309. errors: list,
  310. ):
  311. """Publish printer HMS error event."""
  312. if not self.enabled or not self.connected:
  313. return
  314. self._publish(
  315. f"{self.topic_prefix}/printers/{printer_serial}/error",
  316. {
  317. "printer_id": printer_id,
  318. "printer_name": printer_name,
  319. "printer_serial": printer_serial,
  320. "errors": errors,
  321. "timestamp": datetime.utcnow().isoformat(),
  322. },
  323. )
  324. # =========================================================================
  325. # Print Queue Events
  326. # =========================================================================
  327. async def on_queue_job_added(
  328. self,
  329. job_id: int,
  330. filename: str,
  331. printer_id: int | None,
  332. printer_name: str | None,
  333. ):
  334. """Publish job added to queue event."""
  335. if not self.enabled or not self.connected:
  336. return
  337. self._publish(
  338. f"{self.topic_prefix}/queue/job_added",
  339. {
  340. "job_id": job_id,
  341. "filename": filename,
  342. "printer_id": printer_id,
  343. "printer_name": printer_name,
  344. "timestamp": datetime.utcnow().isoformat(),
  345. },
  346. )
  347. async def on_queue_job_started(
  348. self,
  349. job_id: int,
  350. filename: str,
  351. printer_id: int,
  352. printer_name: str,
  353. printer_serial: str,
  354. ):
  355. """Publish queued job started printing event."""
  356. if not self.enabled or not self.connected:
  357. return
  358. self._publish(
  359. f"{self.topic_prefix}/queue/job_started",
  360. {
  361. "job_id": job_id,
  362. "filename": filename,
  363. "printer_id": printer_id,
  364. "printer_name": printer_name,
  365. "printer_serial": printer_serial,
  366. "timestamp": datetime.utcnow().isoformat(),
  367. },
  368. )
  369. async def on_queue_job_completed(
  370. self,
  371. job_id: int,
  372. filename: str,
  373. printer_id: int,
  374. printer_name: str,
  375. status: str,
  376. ):
  377. """Publish queued job finished event."""
  378. if not self.enabled or not self.connected:
  379. return
  380. topic = (
  381. f"{self.topic_prefix}/queue/job_completed"
  382. if status == "completed"
  383. else f"{self.topic_prefix}/queue/job_failed"
  384. )
  385. self._publish(
  386. topic,
  387. {
  388. "job_id": job_id,
  389. "filename": filename,
  390. "printer_id": printer_id,
  391. "printer_name": printer_name,
  392. "status": status,
  393. "timestamp": datetime.utcnow().isoformat(),
  394. },
  395. )
  396. # =========================================================================
  397. # Maintenance Events
  398. # =========================================================================
  399. async def on_maintenance_alert(
  400. self,
  401. printer_id: int,
  402. printer_name: str,
  403. maintenance_type: str,
  404. current_value: float,
  405. threshold: float,
  406. ):
  407. """Publish maintenance alert triggered event."""
  408. if not self.enabled or not self.connected:
  409. return
  410. self._publish(
  411. f"{self.topic_prefix}/maintenance/alert",
  412. {
  413. "printer_id": printer_id,
  414. "printer_name": printer_name,
  415. "maintenance_type": maintenance_type,
  416. "current_value": current_value,
  417. "threshold": threshold,
  418. "timestamp": datetime.utcnow().isoformat(),
  419. },
  420. )
  421. async def on_maintenance_acknowledged(
  422. self,
  423. printer_id: int,
  424. printer_name: str,
  425. maintenance_type: str,
  426. ):
  427. """Publish maintenance alert acknowledged event."""
  428. if not self.enabled or not self.connected:
  429. return
  430. self._publish(
  431. f"{self.topic_prefix}/maintenance/acknowledged",
  432. {
  433. "printer_id": printer_id,
  434. "printer_name": printer_name,
  435. "maintenance_type": maintenance_type,
  436. "timestamp": datetime.utcnow().isoformat(),
  437. },
  438. )
  439. async def on_maintenance_reset(
  440. self,
  441. printer_id: int,
  442. printer_name: str,
  443. maintenance_type: str,
  444. ):
  445. """Publish maintenance counter reset event."""
  446. if not self.enabled or not self.connected:
  447. return
  448. self._publish(
  449. f"{self.topic_prefix}/maintenance/reset",
  450. {
  451. "printer_id": printer_id,
  452. "printer_name": printer_name,
  453. "maintenance_type": maintenance_type,
  454. "timestamp": datetime.utcnow().isoformat(),
  455. },
  456. )
  457. # =========================================================================
  458. # Archive Events
  459. # =========================================================================
  460. async def on_archive_created(
  461. self,
  462. archive_id: int,
  463. print_name: str,
  464. printer_name: str,
  465. status: str,
  466. ):
  467. """Publish print archived event."""
  468. if not self.enabled or not self.connected:
  469. return
  470. self._publish(
  471. f"{self.topic_prefix}/archive/created",
  472. {
  473. "archive_id": archive_id,
  474. "print_name": print_name,
  475. "printer_name": printer_name,
  476. "status": status,
  477. "timestamp": datetime.utcnow().isoformat(),
  478. },
  479. )
  480. async def on_archive_updated(
  481. self,
  482. archive_id: int,
  483. print_name: str,
  484. status: str,
  485. ):
  486. """Publish archive record updated event."""
  487. if not self.enabled or not self.connected:
  488. return
  489. self._publish(
  490. f"{self.topic_prefix}/archive/updated",
  491. {
  492. "archive_id": archive_id,
  493. "print_name": print_name,
  494. "status": status,
  495. "timestamp": datetime.utcnow().isoformat(),
  496. },
  497. )
  498. # =========================================================================
  499. # Filament/Spoolman Events
  500. # =========================================================================
  501. async def on_filament_low(
  502. self,
  503. spool_id: int,
  504. spool_name: str,
  505. remaining_weight: float,
  506. remaining_percent: float,
  507. ):
  508. """Publish filament inventory low event."""
  509. if not self.enabled or not self.connected:
  510. return
  511. self._publish(
  512. f"{self.topic_prefix}/filament/low",
  513. {
  514. "spool_id": spool_id,
  515. "spool_name": spool_name,
  516. "remaining_weight": remaining_weight,
  517. "remaining_percent": remaining_percent,
  518. "timestamp": datetime.utcnow().isoformat(),
  519. },
  520. )
  521. # =========================================================================
  522. # Smart Plug Events
  523. # =========================================================================
  524. async def on_smart_plug_state(
  525. self,
  526. plug_id: int,
  527. plug_name: str,
  528. state: str,
  529. printer_id: int | None = None,
  530. printer_name: str | None = None,
  531. ):
  532. """Publish smart plug state change event."""
  533. if not self.enabled or not self.connected:
  534. return
  535. topic = f"{self.topic_prefix}/smart_plugs/on" if state == "on" else f"{self.topic_prefix}/smart_plugs/off"
  536. self._publish(
  537. topic,
  538. {
  539. "plug_id": plug_id,
  540. "plug_name": plug_name,
  541. "state": state,
  542. "printer_id": printer_id,
  543. "printer_name": printer_name,
  544. "timestamp": datetime.utcnow().isoformat(),
  545. },
  546. )
  547. async def on_smart_plug_energy(
  548. self,
  549. plug_id: int,
  550. plug_name: str,
  551. power: float,
  552. energy_today: float,
  553. energy_total: float,
  554. ):
  555. """Publish smart plug energy update event."""
  556. if not self.enabled or not self.connected:
  557. return
  558. self._publish(
  559. f"{self.topic_prefix}/smart_plugs/energy",
  560. {
  561. "plug_id": plug_id,
  562. "plug_name": plug_name,
  563. "power_watts": power,
  564. "energy_today_kwh": energy_today,
  565. "energy_total_kwh": energy_total,
  566. "timestamp": datetime.utcnow().isoformat(),
  567. },
  568. )
  569. # Global instance
  570. mqtt_relay = MQTTRelayService()