| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687 |
- """MQTT Relay Service for publishing BamBuddy events to external MQTT brokers.
- This service enables integration with external automation systems like
- Node-RED, Home Assistant, and other MQTT-based platforms.
- """
- import asyncio
- import json
- import logging
- import ssl
- import threading
- import time
- from datetime import datetime
- from typing import Any
- import paho.mqtt.client as mqtt
- logger = logging.getLogger(__name__)
- class MQTTRelayService:
- """Publishes BamBuddy events to an external MQTT broker."""
- # Minimum interval between status updates per printer (seconds)
- STATUS_THROTTLE_SECONDS = 1.0
- def __init__(self):
- self.client: mqtt.Client | None = None
- self.enabled = False
- self.connected = False
- self.topic_prefix = "bambuddy"
- self._lock = threading.Lock()
- self._loop: asyncio.AbstractEventLoop | None = None
- self._broker = ""
- self._port = 1883
- self._last_printer_status: dict[int, float] = {} # printer_id -> last publish timestamp
- self._smart_plug_service = None # Lazy import to avoid circular dependency
- self._settings: dict = {} # Store settings for smart plug service
- async def configure(self, settings: dict) -> bool:
- """Configure MQTT connection from settings.
- Returns True if connection was successful or MQTT is disabled.
- """
- self.enabled = settings.get("mqtt_enabled", False)
- self._settings = settings # Store for smart plug service
- if not self.enabled:
- await self.disconnect()
- # Also configure smart plug service (will disable it)
- await self._configure_smart_plug_service(settings)
- logger.info("MQTT relay disabled")
- return True
- broker = settings.get("mqtt_broker", "")
- port = settings.get("mqtt_port", 1883)
- username = settings.get("mqtt_username", "")
- password = settings.get("mqtt_password", "")
- self.topic_prefix = settings.get("mqtt_topic_prefix", "bambuddy")
- use_tls = settings.get("mqtt_use_tls", False)
- if not broker:
- logger.warning("MQTT enabled but no broker configured")
- return False
- # Store for status endpoint
- self._broker = broker
- self._port = port
- # Disconnect existing connection if settings changed
- if self.client:
- await self.disconnect()
- # Create and connect client
- result = await self._connect(broker, port, username, password, use_tls)
- # Configure smart plug service with same settings
- await self._configure_smart_plug_service(settings)
- return result
- async def _configure_smart_plug_service(self, settings: dict):
- """Configure the MQTT smart plug service with the same broker settings."""
- try:
- if self._smart_plug_service is None:
- from backend.app.services.mqtt_smart_plug import mqtt_smart_plug_service
- self._smart_plug_service = mqtt_smart_plug_service
- await self._smart_plug_service.configure(settings)
- except Exception as e:
- logger.error("Failed to configure MQTT smart plug service: %s", e)
- @property
- def smart_plug_service(self):
- """Get the MQTT smart plug service instance."""
- if self._smart_plug_service is None:
- from backend.app.services.mqtt_smart_plug import mqtt_smart_plug_service
- self._smart_plug_service = mqtt_smart_plug_service
- return self._smart_plug_service
- async def _connect(self, broker: str, port: int, username: str, password: str, use_tls: bool) -> bool:
- """Establish MQTT connection."""
- try:
- # Create client with callback API version 2 (use MQTTv311 for broader compatibility)
- self.client = mqtt.Client(
- callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
- client_id=f"bambuddy-{id(self)}",
- protocol=mqtt.MQTTv311,
- )
- # Set up callbacks
- self.client.on_connect = self._on_connect
- self.client.on_disconnect = self._on_disconnect
- # Configure authentication
- if username:
- self.client.username_pw_set(username, password)
- # Configure TLS (allow self-signed certs for testing)
- if use_tls:
- self.client.tls_set(cert_reqs=ssl.CERT_NONE)
- self.client.tls_insecure_set(True) # Allow self-signed certs
- # Run connect_async in thread pool with timeout to avoid blocking
- # on unreachable brokers (connect_async does synchronous socket creation)
- try:
- await asyncio.wait_for(asyncio.to_thread(self.client.connect_async, broker, port, 60), timeout=3.0)
- except TimeoutError:
- logger.warning("MQTT relay connection to %s:%s timed out", broker, port)
- return False
- self.client.loop_start()
- # Wait briefly for connection callback
- await asyncio.sleep(1.0)
- if self.connected:
- logger.info("MQTT relay connected to %s:%s", broker, port)
- # Publish online status
- self._publish_status("online")
- return True
- else:
- logger.warning("MQTT relay connection pending to %s:%s", broker, port)
- return True # Connection is async, may succeed later
- except Exception as e:
- logger.error("MQTT relay connection failed: %s", e)
- self.connected = False
- return False
- def _on_connect(
- self,
- client: mqtt.Client,
- userdata: Any,
- flags: dict,
- reason_code: int | mqtt.ReasonCode,
- properties: mqtt.Properties | None = None,
- ):
- """Callback when connected to broker."""
- # Handle both MQTTv311 (int) and MQTTv5 (ReasonCode) return codes
- rc = reason_code if isinstance(reason_code, int) else reason_code.value
- if rc == 0:
- self.connected = True
- logger.info("MQTT relay connected successfully")
- # Publish online status
- self._publish_status("online")
- else:
- self.connected = False
- logger.error("MQTT relay connection failed: %s", reason_code)
- def _on_disconnect(
- self,
- client: mqtt.Client,
- userdata: Any,
- flags_or_rc: dict | int | mqtt.ReasonCode,
- reason_code: int | mqtt.ReasonCode | None = None,
- properties: mqtt.Properties | None = None,
- ):
- """Callback when disconnected from broker."""
- self.connected = False
- # Handle both MQTTv311 (rc as 3rd param) and MQTTv5 (flags, rc, props)
- rc = reason_code if reason_code is not None else flags_or_rc
- rc_val = rc if isinstance(rc, int) else getattr(rc, "value", 0)
- if rc_val != 0:
- logger.warning("MQTT relay disconnected: %s", rc)
- else:
- logger.info("MQTT relay disconnected cleanly")
- async def disconnect(self):
- """Disconnect from MQTT broker."""
- if self.client:
- try:
- # Publish offline status before disconnecting
- self._publish_status("offline")
- self.client.loop_stop()
- self.client.disconnect()
- except Exception as e:
- logger.debug("MQTT disconnect error (ignored): %s", e)
- finally:
- self.client = None
- self.connected = False
- def _publish_status(self, status: str):
- """Publish BamBuddy status (online/offline)."""
- self._publish(
- f"{self.topic_prefix}/status",
- {"status": status, "timestamp": datetime.utcnow().isoformat()},
- retain=True,
- )
- def _publish(self, topic: str, payload: dict, retain: bool = False):
- """Publish message to MQTT broker."""
- if not self.client or not self.connected:
- return
- try:
- with self._lock:
- self.client.publish(topic, json.dumps(payload, default=str), qos=1, retain=retain)
- except Exception as e:
- logger.debug("MQTT publish error: %s", e)
- def get_status(self) -> dict:
- """Get current MQTT relay status for API."""
- return {
- "enabled": self.enabled,
- "connected": self.connected,
- "broker": self._broker if self.enabled else "",
- "port": self._port if self.enabled else 0,
- "topic_prefix": self.topic_prefix,
- }
- # =========================================================================
- # Printer Events
- # =========================================================================
- async def on_printer_status(self, printer_id: int, state: Any, printer_name: str, printer_serial: str):
- """Publish printer status change (throttled to 1 update/sec per printer)."""
- if not self.enabled or not self.connected:
- return
- # Throttle status updates to avoid flooding MQTT broker
- now = time.time()
- last_publish = self._last_printer_status.get(printer_id, 0)
- if now - last_publish < self.STATUS_THROTTLE_SECONDS:
- return # Skip this update, too soon since last publish
- self._last_printer_status[printer_id] = now
- # Build status payload from PrinterState
- payload = {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "printer_serial": printer_serial,
- "timestamp": datetime.utcnow().isoformat(),
- "connected": state.connected,
- "state": state.state,
- "progress": state.progress,
- "remaining_time": state.remaining_time,
- "layer_num": state.layer_num,
- "total_layers": state.total_layers,
- "current_print": state.current_print,
- "subtask_name": state.subtask_name,
- "gcode_file": state.gcode_file,
- "temperatures": state.temperatures,
- "wifi_signal": state.wifi_signal,
- "chamber_light": state.chamber_light,
- "speed_level": state.speed_level,
- "cooling_fan_speed": state.cooling_fan_speed,
- "big_fan1_speed": state.big_fan1_speed,
- "big_fan2_speed": state.big_fan2_speed,
- "heatbreak_fan_speed": state.heatbreak_fan_speed,
- }
- self._publish(
- f"{self.topic_prefix}/printers/{printer_serial}/status",
- payload,
- retain=True,
- )
- async def on_printer_online(self, printer_id: int, printer_name: str, printer_serial: str):
- """Publish printer came online event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/printers/{printer_serial}/online",
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "printer_serial": printer_serial,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_printer_offline(self, printer_id: int, printer_name: str, printer_serial: str):
- """Publish printer went offline event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/printers/{printer_serial}/offline",
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "printer_serial": printer_serial,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_print_start(
- self,
- printer_id: int,
- printer_name: str,
- printer_serial: str,
- filename: str,
- subtask_name: str,
- ):
- """Publish print started event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/printers/{printer_serial}/print/started",
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "printer_serial": printer_serial,
- "filename": filename,
- "subtask_name": subtask_name,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_print_complete(
- self,
- printer_id: int,
- printer_name: str,
- printer_serial: str,
- filename: str,
- subtask_name: str,
- status: str,
- ):
- """Publish print completed event."""
- if not self.enabled or not self.connected:
- return
- # Determine topic based on status
- if status == "completed":
- topic = f"{self.topic_prefix}/printers/{printer_serial}/print/completed"
- else:
- topic = f"{self.topic_prefix}/printers/{printer_serial}/print/failed"
- self._publish(
- topic,
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "printer_serial": printer_serial,
- "filename": filename,
- "subtask_name": subtask_name,
- "status": status,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_ams_change(
- self,
- printer_id: int,
- printer_name: str,
- printer_serial: str,
- ams_data: list,
- ):
- """Publish AMS filament change event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/printers/{printer_serial}/ams/changed",
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "printer_serial": printer_serial,
- "ams_units": ams_data,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_printer_error(
- self,
- printer_id: int,
- printer_name: str,
- printer_serial: str,
- errors: list,
- ):
- """Publish printer HMS error event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/printers/{printer_serial}/error",
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "printer_serial": printer_serial,
- "errors": errors,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- # =========================================================================
- # Print Queue Events
- # =========================================================================
- async def on_queue_job_added(
- self,
- job_id: int,
- filename: str,
- printer_id: int | None,
- printer_name: str | None,
- ):
- """Publish job added to queue event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/queue/job_added",
- {
- "job_id": job_id,
- "filename": filename,
- "printer_id": printer_id,
- "printer_name": printer_name,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_queue_job_started(
- self,
- job_id: int,
- filename: str,
- printer_id: int,
- printer_name: str,
- printer_serial: str,
- ):
- """Publish queued job started printing event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/queue/job_started",
- {
- "job_id": job_id,
- "filename": filename,
- "printer_id": printer_id,
- "printer_name": printer_name,
- "printer_serial": printer_serial,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_queue_job_completed(
- self,
- job_id: int,
- filename: str,
- printer_id: int,
- printer_name: str,
- status: str,
- ):
- """Publish queued job finished event."""
- if not self.enabled or not self.connected:
- return
- topic = (
- f"{self.topic_prefix}/queue/job_completed"
- if status == "completed"
- else f"{self.topic_prefix}/queue/job_failed"
- )
- self._publish(
- topic,
- {
- "job_id": job_id,
- "filename": filename,
- "printer_id": printer_id,
- "printer_name": printer_name,
- "status": status,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- # =========================================================================
- # Maintenance Events
- # =========================================================================
- async def on_maintenance_alert(
- self,
- printer_id: int,
- printer_name: str,
- maintenance_type: str,
- current_value: float,
- threshold: float,
- ):
- """Publish maintenance alert triggered event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/maintenance/alert",
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "maintenance_type": maintenance_type,
- "current_value": current_value,
- "threshold": threshold,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_maintenance_acknowledged(
- self,
- printer_id: int,
- printer_name: str,
- maintenance_type: str,
- ):
- """Publish maintenance alert acknowledged event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/maintenance/acknowledged",
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "maintenance_type": maintenance_type,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_maintenance_reset(
- self,
- printer_id: int,
- printer_name: str,
- maintenance_type: str,
- ):
- """Publish maintenance counter reset event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/maintenance/reset",
- {
- "printer_id": printer_id,
- "printer_name": printer_name,
- "maintenance_type": maintenance_type,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- # =========================================================================
- # Archive Events
- # =========================================================================
- async def on_archive_created(
- self,
- archive_id: int,
- print_name: str,
- printer_name: str,
- status: str,
- ):
- """Publish print archived event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/archive/created",
- {
- "archive_id": archive_id,
- "print_name": print_name,
- "printer_name": printer_name,
- "status": status,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_archive_updated(
- self,
- archive_id: int,
- print_name: str,
- status: str,
- ):
- """Publish archive record updated event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/archive/updated",
- {
- "archive_id": archive_id,
- "print_name": print_name,
- "status": status,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- # =========================================================================
- # Filament/Spoolman Events
- # =========================================================================
- async def on_filament_low(
- self,
- spool_id: int,
- spool_name: str,
- remaining_weight: float,
- remaining_percent: float,
- ):
- """Publish filament inventory low event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/filament/low",
- {
- "spool_id": spool_id,
- "spool_name": spool_name,
- "remaining_weight": remaining_weight,
- "remaining_percent": remaining_percent,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- # =========================================================================
- # Smart Plug Events
- # =========================================================================
- async def on_smart_plug_state(
- self,
- plug_id: int,
- plug_name: str,
- state: str,
- printer_id: int | None = None,
- printer_name: str | None = None,
- ):
- """Publish smart plug state change event."""
- if not self.enabled or not self.connected:
- return
- topic = f"{self.topic_prefix}/smart_plugs/on" if state == "on" else f"{self.topic_prefix}/smart_plugs/off"
- self._publish(
- topic,
- {
- "plug_id": plug_id,
- "plug_name": plug_name,
- "state": state,
- "printer_id": printer_id,
- "printer_name": printer_name,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- async def on_smart_plug_energy(
- self,
- plug_id: int,
- plug_name: str,
- power: float,
- energy_today: float,
- energy_total: float,
- ):
- """Publish smart plug energy update event."""
- if not self.enabled or not self.connected:
- return
- self._publish(
- f"{self.topic_prefix}/smart_plugs/energy",
- {
- "plug_id": plug_id,
- "plug_name": plug_name,
- "power_watts": power,
- "energy_today_kwh": energy_today,
- "energy_total_kwh": energy_total,
- "timestamp": datetime.utcnow().isoformat(),
- },
- )
- # Global instance
- mqtt_relay = MQTTRelayService()
|