"""MQTT broker for virtual printer. Implements an MQTT broker that accepts connections from slicers, authenticates with the configured access code, and logs print commands. """ import asyncio import json import logging import ssl from collections.abc import Callable from pathlib import Path logger = logging.getLogger(__name__) # Default MQTT port for Bambu printers (MQTT over TLS) MQTT_PORT = 8883 class VirtualPrinterMQTTServer: """MQTT broker that accepts connections from slicers. This is a minimal MQTT broker implementation that: - Accepts TLS connections on port 8883 - Authenticates with username 'bblp' and the configured access code - Receives print commands on device/{serial}/request - Can publish status on device/{serial}/report """ def __init__( self, serial: str, access_code: str, cert_path: Path, key_path: Path, port: int = MQTT_PORT, on_print_command: Callable[[str, dict], None] | None = None, ): """Initialize the MQTT server. Args: serial: Virtual printer serial number access_code: Password for authentication cert_path: Path to TLS certificate key_path: Path to TLS private key port: Port to listen on (default 8883) on_print_command: Callback when print command received (filename, data) """ self.serial = serial self.access_code = access_code self.cert_path = cert_path self.key_path = key_path self.port = port self.on_print_command = on_print_command self._running = False self._broker = None self._broker_task = None async def start(self) -> None: """Start the MQTT broker.""" if self._running: return # Try to import amqtt try: from amqtt.broker import Broker except ImportError: logger.error("amqtt not installed. Run: pip install amqtt") return logger.info("Starting virtual printer MQTT broker on port %s", self.port) # Build broker configuration config = { "listeners": { "default": { "type": "tcp", "bind": f"0.0.0.0:{self.port}", "ssl": "on", "certfile": str(self.cert_path), "keyfile": str(self.key_path), }, }, "auth": { "allow-anonymous": False, "plugins": ["auth_custom"], }, "topic-check": { "enabled": False, # Allow any topic }, } try: self._running = True # Create and start broker self._broker = Broker(config) # Register custom auth plugin self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate # Start the broker await self._broker.start() logger.info("MQTT broker started on port %s", self.port) # Keep running while self._running: await asyncio.sleep(1) except OSError as e: if e.errno == 98: # Address already in use logger.error("MQTT port %s is already in use", self.port) else: logger.error("MQTT broker error: %s", e) except asyncio.CancelledError: logger.debug("MQTT broker task cancelled") except Exception as e: logger.error("MQTT broker error: %s", e) finally: await self.stop() async def _authenticate(self, session) -> bool: """Authenticate MQTT connection. Args: session: MQTT session with username/password Returns: True if authentication successful """ username = getattr(session, "username", None) password = getattr(session, "password", None) # Bambu slicers use 'bblp' as username and access code as password if username == "bblp" and password == self.access_code: logger.debug("MQTT client authenticated from %s", session.remote_address) return True logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address) return False async def stop(self) -> None: """Stop the MQTT broker.""" logger.info("Stopping MQTT broker") self._running = False if self._broker: try: await self._broker.shutdown() except OSError as e: logger.debug("Error shutting down MQTT broker: %s", e) self._broker = None class SimpleMQTTServer: """Simplified MQTT server using raw sockets. This is a fallback implementation that handles basic MQTT protocol without requiring the amqtt library. It's less feature-complete but more lightweight. """ def __init__( self, serial: str, access_code: str, cert_path: Path, key_path: Path, port: int = MQTT_PORT, on_print_command: Callable[[str, dict], None] | None = None, ): self.serial = serial self.access_code = access_code self.cert_path = cert_path self.key_path = key_path self.port = port self.on_print_command = on_print_command self._running = False self._server = None self._clients: dict[str, asyncio.StreamWriter] = {} self._status_push_task: asyncio.Task | None = None self._sequence_id = 0 # Dynamic state for status reports self._gcode_state = "IDLE" self._current_file = "" self._prepare_percent = "0" async def start(self) -> None: """Start the MQTT server.""" if self._running: return logger.info("Starting simple MQTT server on port %s", self.port) # Create SSL context with Bambu-compatible settings ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path)) # Match Bambu printer behavior - accept any client ssl_context.verify_mode = ssl.CERT_NONE # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3) ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2 # Disable hostname checking ssl_context.check_hostname = False # Log certificate info import subprocess try: result = subprocess.run( ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"], capture_output=True, text=True, timeout=5, ) logger.info("MQTT SSL cert info: %s", result.stdout.strip()) except (OSError, subprocess.SubprocessError): pass # Certificate info is for debug logging only; not critical logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path) try: self._running = True # Wrapper to log ALL connection attempts including SSL errors async def connection_handler(reader, writer): try: addr = writer.get_extra_info("peername") ssl_obj = writer.get_extra_info("ssl_object") if ssl_obj: logger.info( f"MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}" ) else: logger.info("MQTT connection from %s (no TLS?)", addr) await self._handle_client(reader, writer) except ssl.SSLError as e: logger.error("MQTT SSL error: %s", e) except Exception as e: logger.error("MQTT connection handler error: %s", e) # Custom protocol factory to log raw connection attempts logger.info("Setting up MQTT server with SSL error handling...") # Add SSL handshake error callback def handle_ssl_error(loop, context): exception = context.get("exception") message = context.get("message", "") if "ssl" in str(exception).lower() or "ssl" in message.lower(): logger.error("SSL error: %s - %s", message, exception) else: logger.debug("Asyncio error: %s", message) asyncio.get_event_loop().set_exception_handler(handle_ssl_error) self._server = await asyncio.start_server( connection_handler, "0.0.0.0", # nosec B104 self.port, ssl=ssl_context, ) logger.info("Simple MQTT server listening on port %s", self.port) # Start periodic status push task self._status_push_task = asyncio.create_task(self._periodic_status_push()) async with self._server: await self._server.serve_forever() except OSError as e: if e.errno == 98: # Address already in use logger.error("MQTT port %s is already in use", self.port) else: logger.error("MQTT server error: %s", e) except asyncio.CancelledError: logger.debug("MQTT server task cancelled") except Exception as e: logger.error("MQTT server error: %s", e) finally: await self.stop() async def stop(self) -> None: """Stop the MQTT server.""" logger.info("Stopping simple MQTT server") self._running = False # Stop periodic status push if self._status_push_task: self._status_push_task.cancel() try: await self._status_push_task except asyncio.CancelledError: pass # Expected when stopping the periodic status push task self._status_push_task = None # Close all client connections (iterate over copy to avoid modification during iteration) for _client_id, writer in list(self._clients.items()): try: writer.close() await writer.wait_closed() except OSError: pass # Best-effort client connection cleanup; client may have disconnected self._clients.clear() if self._server: try: self._server.close() await self._server.wait_closed() except OSError: pass # Best-effort server shutdown; port may already be released self._server = None async def _periodic_status_push(self) -> None: """Send periodic status updates to all connected clients.""" logger.info("Starting periodic status push task") while self._running: try: await asyncio.sleep(1) # Push every 1 second like real printers # Send status to all connected clients disconnected = [] for client_id, writer in list(self._clients.items()): try: if writer.is_closing(): disconnected.append(client_id) continue await self._send_status_report(writer) except OSError as e: logger.debug("Failed to push status to %s: %s", client_id, e) disconnected.append(client_id) # Remove disconnected clients for client_id in disconnected: self._clients.pop(client_id, None) except asyncio.CancelledError: break except Exception as e: logger.error("Periodic status push error: %s", e) logger.info("Periodic status push task stopped") async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: """Handle an MQTT client connection.""" addr = writer.get_extra_info("peername") client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown" logger.info("MQTT client connected: %s", client_id) authenticated = False try: while self._running: # Read MQTT fixed header try: header = await asyncio.wait_for(reader.read(1), timeout=60) except TimeoutError: break if not header: break packet_type = (header[0] & 0xF0) >> 4 # Read remaining length remaining_length = await self._read_remaining_length(reader) if remaining_length is None: break # Read payload payload = await reader.read(remaining_length) if remaining_length > 0 else b"" # Handle packet types if packet_type == 1: # CONNECT authenticated = await self._handle_connect(payload, writer) if not authenticated: break # Register client for periodic status pushes self._clients[client_id] = writer elif packet_type == 3: # PUBLISH if authenticated: await self._handle_publish(header[0], payload, writer) elif packet_type == 8: # SUBSCRIBE if authenticated: await self._handle_subscribe(payload, writer) elif packet_type == 12: # PINGREQ # Send PINGRESP writer.write(bytes([0xD0, 0x00])) await writer.drain() elif packet_type == 14: # DISCONNECT break except asyncio.CancelledError: pass # Expected when server is shutting down and cancels client tasks except Exception as e: logger.debug("MQTT client error: %s", e) finally: logger.debug("MQTT client disconnected: %s", client_id) if client_id in self._clients: del self._clients[client_id] try: writer.close() await writer.wait_closed() except OSError: pass # Best-effort socket cleanup on client disconnect async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None: """Read MQTT remaining length (variable byte integer).""" multiplier = 1 value = 0 for _ in range(4): try: byte = await reader.read(1) if not byte: return None encoded = byte[0] value += (encoded & 127) * multiplier if (encoded & 128) == 0: return value multiplier *= 128 except OSError: return None return None async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> bool: """Handle MQTT CONNECT packet. Returns True if authentication successful. """ try: # Parse CONNECT packet # Skip protocol name length and name idx = 0 proto_len = (payload[idx] << 8) | payload[idx + 1] idx += 2 + proto_len # Skip protocol level and connect flags # connect_flags = payload[idx + 1] idx += 2 # Skip keepalive idx += 2 # Read client ID client_id_len = (payload[idx] << 8) | payload[idx + 1] idx += 2 # client_id = payload[idx : idx + client_id_len].decode("utf-8") idx += client_id_len # Read username username_len = (payload[idx] << 8) | payload[idx + 1] idx += 2 username = payload[idx : idx + username_len].decode("utf-8") idx += username_len # Read password password_len = (payload[idx] << 8) | payload[idx + 1] idx += 2 password = payload[idx : idx + password_len].decode("utf-8") # Authenticate if username == "bblp" and password == self.access_code: # Send CONNACK with success writer.write(bytes([0x20, 0x02, 0x00, 0x00])) await writer.drain() logger.info("MQTT client authenticated successfully") # Send immediate status report after auth - slicer expects this await self._send_status_report(writer) return True else: # Send CONNACK with auth failure writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized await writer.drain() logger.warning("MQTT auth failed for user '%s'", username) return False except (IndexError, ValueError) as e: logger.debug("MQTT CONNECT parse error: %s", e) # Send CONNACK with error writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error await writer.drain() return False async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter) -> None: """Handle MQTT SUBSCRIBE packet.""" try: # Parse packet ID packet_id = (payload[0] << 8) | payload[1] # Parse topic filters (just acknowledge them) idx = 2 granted_qos = [] while idx < len(payload): topic_len = (payload[idx] << 8) | payload[idx + 1] idx += 2 topic = payload[idx : idx + topic_len].decode("utf-8") idx += topic_len requested_qos = payload[idx] idx += 1 logger.info("MQTT subscribe: %s QoS=%s", topic, requested_qos) granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1 # Send SUBACK suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF]) suback += bytes(granted_qos) writer.write(suback) await writer.drain() # Send initial status report after subscribe await self._send_status_report(writer) except (IndexError, ValueError, OSError) as e: logger.debug("MQTT SUBSCRIBE error: %s", e) async def _send_status_report(self, writer: asyncio.StreamWriter) -> None: """Send a status report to the slicer after connection.""" try: # Build status message matching Bambu printer format self._sequence_id += 1 status = { "print": { "sequence_id": str(self._sequence_id), "command": "push_status", "msg": 0, "gcode_state": self._gcode_state, "gcode_file": self._current_file, "gcode_file_prepare_percent": self._prepare_percent, "subtask_name": self._current_file.replace(".3mf", "") if self._current_file else "", "mc_print_stage": "", "mc_percent": 0, "mc_remaining_time": 0, "wifi_signal": "-44dBm", "print_error": 0, "print_type": "", "bed_temper": 25.0, "bed_target_temper": 0.0, "nozzle_temper": 25.0, "nozzle_target_temper": 0.0, "chamber_temper": 25.0, "cooling_fan_speed": "0", "big_fan1_speed": "0", "big_fan2_speed": "0", "heatbreak_fan_speed": "0", "spd_lvl": 1, "spd_mag": 100, "stg": [], "stg_cur": 0, "layer_num": 0, "total_layer_num": 0, "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL) "hw_switch_state": 0, "online": {"ahb": False, "rfid": False, "version": 7}, "ams_status": 0, "sdcard": True, "storage": {"free": 1000000000, "total": 32000000000}, "upgrade_state": { "sequence_id": 0, "progress": "", "status": "", "consistency_request": False, "dis_state": 0, "err_code": 0, "force_upgrade": False, "message": "", "module": "", "new_version_state": 2, "new_ver_list": [], "ota_new_version_number": "", "ahb_new_version_number": "", }, "ipcam": { "ipcam_dev": "1", "ipcam_record": "enable", "timelapse": "disable", "resolution": "1080p", "mode_bits": 0, }, "xcam": { "allow_skip_parts": False, "buildplate_marker_detector": True, "first_layer_inspector": True, "halt_print_sensitivity": "medium", "print_halt": True, "printing_monitor": True, "spaghetti_detector": True, }, "lights_report": [{"node": "chamber_light", "mode": "on"}], "nozzle_diameter": "0.4", "nozzle_type": "hardened_steel", } } await self._publish_to_report(writer, status) except OSError as e: logger.error("Failed to send status report: %s", e) async def _send_version_response(self, writer: asyncio.StreamWriter, sequence_id: str) -> None: """Send version info response to the slicer.""" try: # Build version response matching OrcaSlicer expectations # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag version_info = { "info": { "command": "get_version", "sequence_id": sequence_id, "module": [ { "name": "ota", "product_name": "X1 Carbon", "sw_ver": "01.07.00.00", "sw_new_ver": "", "hw_ver": "OTA", "sn": self.serial, "flag": 0, }, { "name": "esp32", "product_name": "X1 Carbon", "sw_ver": "01.07.22.25", "sw_new_ver": "", "hw_ver": "AP05", "sn": self.serial, "flag": 0, }, { "name": "rv1126", "product_name": "X1 Carbon", "sw_ver": "00.00.27.38", "sw_new_ver": "", "hw_ver": "AP05", "sn": self.serial, "flag": 0, }, { "name": "th", "product_name": "X1 Carbon", "sw_ver": "00.00.04.00", "sw_new_ver": "", "hw_ver": "TH07", "sn": self.serial, "flag": 0, }, { "name": "mc", "product_name": "X1 Carbon", "sw_ver": "00.00.10.00", "sw_new_ver": "", "hw_ver": "MC07", "sn": self.serial, "flag": 0, }, ], } } await self._publish_to_report(writer, version_info) logger.info("Sent version response") except OSError as e: logger.error("Failed to send version response: %s", e) def set_gcode_state(self, state: str, filename: str = "", prepare_percent: str = "0") -> None: """Update the gcode state reported to connected slicers. Called by the manager to reflect FTP upload progress/completion. """ self._gcode_state = state self._current_file = filename self._prepare_percent = prepare_percent async def _publish_to_report(self, writer: asyncio.StreamWriter, payload: dict) -> None: """Publish a message on the device report topic.""" topic = f"device/{self.serial}/report" message = json.dumps(payload) topic_bytes = topic.encode("utf-8") message_bytes = message.encode("utf-8") remaining = 2 + len(topic_bytes) + len(message_bytes) packet = bytes([0x30]) # PUBLISH, QoS 0 while remaining > 0: byte = remaining % 128 remaining //= 128 if remaining > 0: byte |= 0x80 packet += bytes([byte]) packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF]) packet += topic_bytes packet += message_bytes writer.write(packet) # Timeout the drain to prevent blocking the event loop if the # MQTT client stops reading (e.g. slicer busy with FTP upload). try: await asyncio.wait_for(writer.drain(), timeout=5) except TimeoutError: logger.debug("MQTT drain timeout for %s — client may be busy", topic) async def _send_print_response(self, writer: asyncio.StreamWriter, sequence_id: str, filename: str) -> None: """Send project_file acknowledgment matching real Bambu printer behavior.""" # Update state so periodic status pushes reflect preparation self._gcode_state = "PREPARE" self._current_file = filename self._prepare_percent = "0" try: # Send command acknowledgment — slicer expects to see # command: "project_file" echoed back before starting FTP upload subtask_name = filename.replace(".3mf", "") if filename else "" response = { "print": { "command": "project_file", "sequence_id": sequence_id, "param": "Metadata/plate_1.gcode", "subtask_name": subtask_name, "gcode_state": "PREPARE", "gcode_file": filename, "gcode_file_prepare_percent": "0", "result": "SUCCESS", "msg": 0, } } await self._publish_to_report(writer, response) logger.info("Sent project_file acknowledgment for %s", filename) except OSError as e: logger.error("Failed to send print response: %s", e) async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter) -> None: """Handle MQTT PUBLISH packet.""" try: # Parse topic idx = 0 topic_len = (payload[idx] << 8) | payload[idx + 1] idx += 2 topic = payload[idx : idx + topic_len].decode("utf-8") idx += topic_len # Check for packet ID (QoS > 0) qos = (header & 0x06) >> 1 if qos > 0: # packet_id = (payload[idx] << 8) | payload[idx + 1] idx += 2 # Parse message message = payload[idx:].decode("utf-8") logger.info("MQTT publish to %s: %s...", topic, message[:100]) # Handle commands on device request topic if f"device/{self.serial}/request" in topic: try: data = json.loads(message) # Handle pushing command (status request) if "pushing" in data: pushing_data = data["pushing"] command = pushing_data.get("command", "") logger.info("MQTT pushing command: %s", command) if command == "pushall": # Slicer is requesting full status - send response logger.info("Sending status report in response to pushall") await self._send_status_report(writer) elif command == "start": # Slicer wants periodic status updates - send one now logger.info("Starting status push stream") await self._send_status_report(writer) # Handle info commands (get_version, etc.) if "info" in data: info_data = data["info"] command = info_data.get("command", "") sequence_id = info_data.get("sequence_id", "0") logger.info("MQTT info command: %s", command) if command == "get_version": await self._send_version_response(writer, sequence_id) # Handle print commands if "print" in data: print_data = data["print"] command = print_data.get("command", "") filename = print_data.get("subtask_name", "") sequence_id = print_data.get("sequence_id", "0") logger.info("MQTT print command: %s for %s", command, filename) if command == "project_file": # Respond with PREPARE status so slicer proceeds with FTP upload file_3mf = print_data.get("file", filename) await self._send_print_response(writer, sequence_id, file_3mf) if self.on_print_command: await self._notify_print_command(filename, print_data) except json.JSONDecodeError: pass # Non-JSON payloads on request topic are safely ignored except (IndexError, ValueError, OSError) as e: logger.debug("MQTT PUBLISH error: %s", e) async def _notify_print_command(self, filename: str, data: dict) -> None: """Notify callback of print command.""" if self.on_print_command: try: result = self.on_print_command(filename, data) if asyncio.iscoroutine(result): await result except Exception as e: logger.error("Print command callback error: %s", e)