| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903 |
- """MQTT broker for virtual printer.
- Implements an MQTT broker that accepts connections from slicers,
- authenticates with the configured access code, and logs print commands.
- """
- import asyncio
- import json
- import logging
- import ssl
- from collections.abc import Callable
- from pathlib import Path
- logger = logging.getLogger(__name__)
- # Default MQTT port for Bambu printers (MQTT over TLS)
- MQTT_PORT = 8883
- # Model code → product_name for version response (must match what slicer expects)
- MODEL_PRODUCT_NAMES = {
- "BL-P001": "X1 Carbon",
- "BL-P002": "X1",
- "C13": "X1E",
- "N6": "X2D",
- "C11": "P1P",
- "C12": "P1S",
- "N7": "P2S",
- "N2S": "A1",
- "N1": "A1 mini",
- "O1D": "H2D",
- "O1C": "H2C",
- "O1C2": "H2C",
- "O1S": "H2S",
- }
- class VirtualPrinterMQTTServer:
- """MQTT broker that accepts connections from slicers.
- This is a minimal MQTT broker implementation that:
- - Accepts TLS connections on port 8883
- - Authenticates with username 'bblp' and the configured access code
- - Receives print commands on device/{serial}/request
- - Can publish status on device/{serial}/report
- """
- def __init__(
- self,
- serial: str,
- access_code: str,
- cert_path: Path,
- key_path: Path,
- port: int = MQTT_PORT,
- on_print_command: Callable[[str, dict], None] | None = None,
- ):
- """Initialize the MQTT server.
- Args:
- serial: Virtual printer serial number
- access_code: Password for authentication
- cert_path: Path to TLS certificate
- key_path: Path to TLS private key
- port: Port to listen on (default 8883)
- on_print_command: Callback when print command received (filename, data)
- """
- self.serial = serial
- self.access_code = access_code
- self.cert_path = cert_path
- self.key_path = key_path
- self.port = port
- self.on_print_command = on_print_command
- self._running = False
- self._broker = None
- self._broker_task = None
- async def start(self) -> None:
- """Start the MQTT broker."""
- if self._running:
- return
- # Try to import amqtt
- try:
- from amqtt.broker import Broker
- except ImportError:
- logger.error("amqtt not installed. Run: pip install amqtt")
- return
- logger.info("Starting virtual printer MQTT broker on port %s", self.port)
- # Build broker configuration
- config = {
- "listeners": {
- "default": {
- "type": "tcp",
- "bind": f"0.0.0.0:{self.port}",
- "ssl": "on",
- "certfile": str(self.cert_path),
- "keyfile": str(self.key_path),
- },
- },
- "auth": {
- "allow-anonymous": False,
- "plugins": ["auth_custom"],
- },
- "topic-check": {
- "enabled": False, # Allow any topic
- },
- }
- try:
- self._running = True
- # Create and start broker
- self._broker = Broker(config)
- # Register custom auth plugin
- self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate
- # Start the broker
- await self._broker.start()
- logger.info("MQTT broker started on port %s", self.port)
- # Keep running
- while self._running:
- await asyncio.sleep(1)
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("MQTT port %s is already in use", self.port)
- else:
- logger.error("MQTT broker error: %s", e)
- except asyncio.CancelledError:
- logger.debug("MQTT broker task cancelled")
- except Exception as e:
- logger.error("MQTT broker error: %s", e)
- finally:
- await self.stop()
- async def _authenticate(self, session) -> bool:
- """Authenticate MQTT connection.
- Args:
- session: MQTT session with username/password
- Returns:
- True if authentication successful
- """
- username = getattr(session, "username", None)
- password = getattr(session, "password", None)
- # Bambu slicers use 'bblp' as username and access code as password
- if username == "bblp" and password == self.access_code:
- logger.debug("MQTT client authenticated from %s", session.remote_address)
- return True
- logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address)
- return False
- async def stop(self) -> None:
- """Stop the MQTT broker."""
- logger.info("Stopping MQTT broker")
- self._running = False
- if self._broker:
- try:
- await self._broker.shutdown()
- except OSError as e:
- logger.debug("Error shutting down MQTT broker: %s", e)
- self._broker = None
- class SimpleMQTTServer:
- """Simplified MQTT server using raw sockets.
- This is a fallback implementation that handles basic MQTT protocol
- without requiring the amqtt library. It's less feature-complete but
- more lightweight.
- """
- def __init__(
- self,
- serial: str,
- access_code: str,
- cert_path: Path,
- key_path: Path,
- port: int = MQTT_PORT,
- on_print_command: Callable[[str, dict], None] | None = None,
- model: str = "",
- bind_address: str = "0.0.0.0", # nosec B104
- vp_name: str = "",
- ):
- self.serial = serial
- self.access_code = access_code
- self.model = model
- self.cert_path = cert_path
- self.key_path = key_path
- self.port = port
- self.on_print_command = on_print_command
- self.bind_address = bind_address
- self.vp_name = vp_name
- self._log_prefix = f"[{vp_name}] " if vp_name else ""
- self._running = False
- self._server = None
- self._clients: dict[str, asyncio.StreamWriter] = {}
- # Per-client "effective serial" — the serial the slicer actually uses in
- # device/{serial}/report|request topics. Populated from the first
- # SUBSCRIBE/PUBLISH we see on a connection. This lets the VP respond on
- # the topic the slicer is listening on even when it disagrees with
- # self.serial (e.g. a stale Orca config that was bound to an older VP
- # serial, or a printer entry that was re-pointed at the VP IP without
- # updating the serial).
- self._client_serials: dict[str, str] = {}
- self._status_push_task: asyncio.Task | None = None
- self._sequence_id = 0
- # Dynamic state for status reports
- self._gcode_state = "IDLE"
- self._current_file = ""
- self._prepare_percent = "0"
- async def start(self) -> None:
- """Start the MQTT server."""
- if self._running:
- return
- logger.info("Starting simple MQTT server on port %s", self.port)
- # Create SSL context with Bambu-compatible settings
- ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
- # Match Bambu printer behavior - accept any client
- ssl_context.verify_mode = ssl.CERT_NONE
- # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3)
- ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
- # Disable hostname checking
- ssl_context.check_hostname = False
- # Log certificate info
- import subprocess
- try:
- result = subprocess.run(
- ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"],
- capture_output=True,
- text=True,
- timeout=5,
- )
- logger.info("MQTT SSL cert info: %s", result.stdout.strip())
- except (OSError, subprocess.SubprocessError):
- pass # Certificate info is for debug logging only; not critical
- logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path)
- try:
- self._running = True
- # Wrapper to log ALL connection attempts including SSL errors
- async def connection_handler(reader, writer):
- try:
- addr = writer.get_extra_info("peername")
- ssl_obj = writer.get_extra_info("ssl_object")
- if ssl_obj:
- logger.info(
- f"{self._log_prefix}MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}"
- )
- else:
- logger.info("%sMQTT connection from %s (no TLS?)", self._log_prefix, addr)
- await self._handle_client(reader, writer)
- except ssl.SSLError as e:
- logger.error("MQTT SSL error: %s", e)
- except Exception as e:
- logger.error("MQTT connection handler error: %s", e)
- self._server = await asyncio.start_server(
- connection_handler,
- self.bind_address,
- self.port,
- ssl=ssl_context,
- )
- logger.info("Simple MQTT server listening on port %s", self.port)
- # Start periodic status push task
- self._status_push_task = asyncio.create_task(self._periodic_status_push())
- async with self._server:
- await self._server.serve_forever()
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("MQTT port %s is already in use", self.port)
- else:
- logger.error("MQTT server error: %s", e)
- except asyncio.CancelledError:
- logger.debug("MQTT server task cancelled")
- except Exception as e:
- logger.error("MQTT server error: %s", e)
- finally:
- await self.stop()
- async def stop(self) -> None:
- """Stop the MQTT server."""
- logger.info("Stopping simple MQTT server")
- self._running = False
- # Stop periodic status push
- if self._status_push_task:
- self._status_push_task.cancel()
- try:
- await self._status_push_task
- except asyncio.CancelledError:
- pass # Expected when stopping the periodic status push task
- self._status_push_task = None
- # Close all client connections (iterate over copy to avoid modification during iteration)
- for _client_id, writer in list(self._clients.items()):
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort client connection cleanup; client may have disconnected
- self._clients.clear()
- self._client_serials.clear()
- if self._server:
- try:
- self._server.close()
- await self._server.wait_closed()
- except OSError:
- pass # Best-effort server shutdown; port may already be released
- self._server = None
- @staticmethod
- def _extract_serial_from_topic(topic: str) -> str | None:
- """Pull the serial out of a `device/{serial}/report|request` topic.
- Returns None if the topic doesn't match that shape — callers fall back
- to self.serial in that case.
- """
- if not topic.startswith("device/"):
- return None
- rest = topic[len("device/") :]
- # Expect "{serial}/report" or "{serial}/request" (possibly with suffixes).
- slash = rest.find("/")
- if slash <= 0:
- return None
- return rest[:slash]
- async def _periodic_status_push(self) -> None:
- """Send periodic status updates to all connected clients."""
- logger.info("Starting periodic status push task")
- while self._running:
- try:
- await asyncio.sleep(1) # Push every 1 second like real printers
- # Send status to all connected clients
- disconnected = []
- for client_id, writer in list(self._clients.items()):
- try:
- if writer.is_closing():
- disconnected.append(client_id)
- continue
- serial = self._client_serials.get(client_id, self.serial)
- await self._send_status_report(writer, serial=serial)
- except OSError as e:
- logger.debug("Failed to push status to %s: %s", client_id, e)
- disconnected.append(client_id)
- # Remove disconnected clients
- for client_id in disconnected:
- self._clients.pop(client_id, None)
- self._client_serials.pop(client_id, None)
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error("Periodic status push error: %s", e)
- logger.info("Periodic status push task stopped")
- async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
- """Handle an MQTT client connection."""
- addr = writer.get_extra_info("peername")
- client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
- logger.info("%sMQTT client connected: %s", self._log_prefix, client_id)
- authenticated = False
- try:
- while self._running:
- # Read MQTT fixed header
- try:
- header = await asyncio.wait_for(reader.read(1), timeout=60)
- except TimeoutError:
- break
- if not header:
- break
- packet_type = (header[0] & 0xF0) >> 4
- # Read remaining length
- remaining_length = await self._read_remaining_length(reader)
- if remaining_length is None:
- break
- # Read payload
- payload = await reader.read(remaining_length) if remaining_length > 0 else b""
- # Handle packet types
- if packet_type == 1: # CONNECT
- authenticated = await self._handle_connect(payload, writer)
- if not authenticated:
- break
- # Register client for periodic status pushes; start with
- # self.serial as the fallback until we learn the slicer's
- # preferred serial from the first SUBSCRIBE/PUBLISH.
- self._clients[client_id] = writer
- self._client_serials[client_id] = self.serial
- elif packet_type == 3: # PUBLISH
- if authenticated:
- await self._handle_publish(header[0], payload, writer, client_id)
- elif packet_type == 8: # SUBSCRIBE
- if authenticated:
- await self._handle_subscribe(payload, writer, client_id)
- elif packet_type == 12: # PINGREQ
- # Send PINGRESP
- writer.write(bytes([0xD0, 0x00]))
- await writer.drain()
- elif packet_type == 14: # DISCONNECT
- break
- except asyncio.CancelledError:
- pass # Expected when server is shutting down and cancels client tasks
- except Exception as e:
- logger.debug("MQTT client error: %s", e)
- finally:
- logger.debug("MQTT client disconnected: %s", client_id)
- self._clients.pop(client_id, None)
- self._client_serials.pop(client_id, None)
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort socket cleanup on client disconnect
- async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None:
- """Read MQTT remaining length (variable byte integer)."""
- multiplier = 1
- value = 0
- for _ in range(4):
- try:
- byte = await reader.read(1)
- if not byte:
- return None
- encoded = byte[0]
- value += (encoded & 127) * multiplier
- if (encoded & 128) == 0:
- return value
- multiplier *= 128
- except OSError:
- return None
- return None
- async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> bool:
- """Handle MQTT CONNECT packet.
- Returns True if authentication successful.
- """
- try:
- # Parse CONNECT packet
- # Skip protocol name length and name
- idx = 0
- proto_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2 + proto_len
- # Skip protocol level and connect flags
- # connect_flags = payload[idx + 1]
- idx += 2
- # Skip keepalive
- idx += 2
- # Read client ID
- client_id_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- # client_id = payload[idx : idx + client_id_len].decode("utf-8")
- idx += client_id_len
- # Read username
- username_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- username = payload[idx : idx + username_len].decode("utf-8")
- idx += username_len
- # Read password
- password_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- password = payload[idx : idx + password_len].decode("utf-8")
- # Authenticate
- if username == "bblp" and password == self.access_code:
- # Send CONNACK with success
- writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
- await writer.drain()
- logger.info("%sMQTT client authenticated successfully", self._log_prefix)
- # Send immediate status report after auth - slicer expects this
- await self._send_status_report(writer)
- return True
- else:
- # Send CONNACK with auth failure
- writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
- await writer.drain()
- logger.warning("%sMQTT auth failed for user '%s' (access code mismatch)", self._log_prefix, username)
- return False
- except (IndexError, ValueError) as e:
- logger.debug("MQTT CONNECT parse error: %s", e)
- # Send CONNACK with error
- writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error
- await writer.drain()
- return False
- async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
- """Handle MQTT SUBSCRIBE packet."""
- try:
- # Parse packet ID
- packet_id = (payload[0] << 8) | payload[1]
- # Parse topic filters (just acknowledge them)
- idx = 2
- granted_qos = []
- learned_serial: str | None = None
- while idx < len(payload):
- topic_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- topic = payload[idx : idx + topic_len].decode("utf-8")
- idx += topic_len
- requested_qos = payload[idx]
- idx += 1
- logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
- granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
- # Remember the serial the slicer is listening on so status/version
- # responses go to a topic it actually subscribed to.
- if learned_serial is None:
- extracted = self._extract_serial_from_topic(topic)
- if extracted:
- learned_serial = extracted
- if learned_serial and learned_serial != self._client_serials.get(client_id):
- if learned_serial != self.serial:
- logger.info(
- "%sMQTT client subscribed with serial %s (VP serial is %s) — adapting responses",
- self._log_prefix,
- learned_serial,
- self.serial,
- )
- self._client_serials[client_id] = learned_serial
- # Send SUBACK
- suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
- suback += bytes(granted_qos)
- writer.write(suback)
- await writer.drain()
- # Send initial status report after subscribe on the client's subscribed topic
- await self._send_status_report(writer, serial=self._client_serials.get(client_id, self.serial))
- except (IndexError, ValueError, OSError) as e:
- logger.debug("MQTT SUBSCRIBE error: %s", e)
- async def _send_status_report(self, writer: asyncio.StreamWriter, serial: str | None = None) -> None:
- """Send a status report to the slicer after connection."""
- try:
- # Build status message matching Bambu printer format
- self._sequence_id += 1
- status = {
- "print": {
- "sequence_id": str(self._sequence_id),
- "command": "push_status",
- "msg": 0,
- "gcode_state": self._gcode_state,
- "gcode_file": self._current_file,
- "gcode_file_prepare_percent": self._prepare_percent,
- "subtask_name": self._current_file.replace(".3mf", "") if self._current_file else "",
- "mc_print_stage": "",
- "mc_percent": 0,
- "mc_remaining_time": 0,
- "wifi_signal": "-44dBm",
- "print_error": 0,
- "print_type": "",
- "bed_temper": 25.0,
- "bed_target_temper": 0.0,
- "nozzle_temper": 25.0,
- "nozzle_target_temper": 0.0,
- "chamber_temper": 25.0,
- "cooling_fan_speed": "0",
- "big_fan1_speed": "0",
- "big_fan2_speed": "0",
- "heatbreak_fan_speed": "0",
- "spd_lvl": 1,
- "spd_mag": 100,
- "stg": [],
- "stg_cur": 0,
- "layer_num": 0,
- "total_layer_num": 0,
- "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL)
- "hw_switch_state": 0,
- "online": {"ahb": False, "rfid": False, "version": 7},
- "ams_status": 0,
- "sdcard": True,
- "storage": {"free": 1000000000, "total": 32000000000},
- "upgrade_state": {
- "sequence_id": 0,
- "progress": "",
- "status": "",
- "consistency_request": False,
- "dis_state": 0,
- "err_code": 0,
- "force_upgrade": False,
- "message": "",
- "module": "",
- "new_version_state": 2,
- "new_ver_list": [],
- "ota_new_version_number": "",
- "ahb_new_version_number": "",
- },
- "ipcam": {
- "ipcam_dev": "1",
- "ipcam_record": "enable",
- "timelapse": "disable",
- "resolution": "1080p",
- "mode_bits": 0,
- },
- "xcam": {
- "allow_skip_parts": False,
- "buildplate_marker_detector": True,
- "first_layer_inspector": True,
- "halt_print_sensitivity": "medium",
- "print_halt": True,
- "printing_monitor": True,
- "spaghetti_detector": True,
- },
- "lights_report": [{"node": "chamber_light", "mode": "on"}],
- "nozzle_diameter": "0.4",
- "nozzle_type": "hardened_steel",
- }
- }
- await self._publish_to_report(writer, status, serial or self.serial)
- except OSError as e:
- logger.error("Failed to send status report: %s", e)
- async def _send_version_response(
- self, writer: asyncio.StreamWriter, sequence_id: str, serial: str | None = None
- ) -> None:
- """Send version info response to the slicer."""
- try:
- product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
- # The serial is embedded inside the module[].sn fields *and* used as
- # the report topic. Use the client's effective serial so the slicer
- # sees internal/topic consistency even when it differs from self.serial.
- serial = serial or self.serial
- # Build version response matching OrcaSlicer expectations
- # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
- version_info = {
- "info": {
- "command": "get_version",
- "sequence_id": sequence_id,
- "module": [
- {
- "name": "ota",
- "product_name": product_name,
- "sw_ver": "01.07.00.00",
- "sw_new_ver": "",
- "hw_ver": "OTA",
- "sn": serial,
- "flag": 0,
- },
- {
- "name": "esp32",
- "product_name": product_name,
- "sw_ver": "01.07.22.25",
- "sw_new_ver": "",
- "hw_ver": "AP05",
- "sn": serial,
- "flag": 0,
- },
- {
- "name": "rv1126",
- "product_name": product_name,
- "sw_ver": "00.00.27.38",
- "sw_new_ver": "",
- "hw_ver": "AP05",
- "sn": serial,
- "flag": 0,
- },
- {
- "name": "th",
- "product_name": product_name,
- "sw_ver": "00.00.04.00",
- "sw_new_ver": "",
- "hw_ver": "TH07",
- "sn": serial,
- "flag": 0,
- },
- {
- "name": "mc",
- "product_name": product_name,
- "sw_ver": "00.00.10.00",
- "sw_new_ver": "",
- "hw_ver": "MC07",
- "sn": serial,
- "flag": 0,
- },
- ],
- }
- }
- await self._publish_to_report(writer, version_info, serial)
- logger.info("Sent version response (product_name=%s)", product_name)
- except OSError as e:
- logger.error("Failed to send version response: %s", e)
- def set_gcode_state(self, state: str, filename: str = "", prepare_percent: str = "0") -> None:
- """Update the gcode state reported to connected slicers.
- Called by the manager to reflect FTP upload progress/completion.
- """
- self._gcode_state = state
- self._current_file = filename
- self._prepare_percent = prepare_percent
- async def _publish_to_report(self, writer: asyncio.StreamWriter, payload: dict, serial: str = "") -> None:
- """Publish a message on the device report topic."""
- topic = f"device/{serial or self.serial}/report"
- message = json.dumps(payload)
- topic_bytes = topic.encode("utf-8")
- message_bytes = message.encode("utf-8")
- remaining = 2 + len(topic_bytes) + len(message_bytes)
- packet = bytes([0x30]) # PUBLISH, QoS 0
- while remaining > 0:
- byte = remaining % 128
- remaining //= 128
- if remaining > 0:
- byte |= 0x80
- packet += bytes([byte])
- packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
- packet += topic_bytes
- packet += message_bytes
- writer.write(packet)
- # Timeout the drain to prevent blocking the event loop if the
- # MQTT client stops reading (e.g. slicer busy with FTP upload).
- try:
- await asyncio.wait_for(writer.drain(), timeout=5)
- except TimeoutError:
- logger.debug("MQTT drain timeout for %s — client may be busy", topic)
- async def _send_print_response(
- self, writer: asyncio.StreamWriter, sequence_id: str, filename: str, serial: str | None = None
- ) -> None:
- """Send project_file acknowledgment matching real Bambu printer behavior."""
- # Update state so periodic status pushes reflect preparation
- self._gcode_state = "PREPARE"
- self._current_file = filename
- self._prepare_percent = "0"
- try:
- # Send command acknowledgment — slicer expects to see
- # command: "project_file" echoed back before starting FTP upload
- subtask_name = filename.replace(".3mf", "") if filename else ""
- response = {
- "print": {
- "command": "project_file",
- "sequence_id": sequence_id,
- "param": "Metadata/plate_1.gcode",
- "subtask_name": subtask_name,
- "gcode_state": "PREPARE",
- "gcode_file": filename,
- "gcode_file_prepare_percent": "0",
- "result": "SUCCESS",
- "msg": 0,
- }
- }
- await self._publish_to_report(writer, response, serial or self.serial)
- logger.info("Sent project_file acknowledgment for %s", filename)
- except OSError as e:
- logger.error("Failed to send print response: %s", e)
- async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
- """Handle MQTT PUBLISH packet."""
- try:
- # Parse topic
- idx = 0
- topic_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- topic = payload[idx : idx + topic_len].decode("utf-8")
- idx += topic_len
- # Check for packet ID (QoS > 0)
- qos = (header & 0x06) >> 1
- if qos > 0:
- # packet_id = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- # Parse message
- message = payload[idx:].decode("utf-8")
- logger.info("MQTT publish to %s: %s...", topic, message[:100])
- # Only handle publishes on *some* device/.../request topic. The
- # serial is taken from the topic rather than compared against
- # self.serial: the client is already authenticated via the access
- # code, and Orca/BambuStudio may have a cached serial that differs
- # from the VP's computed self.serial (#927). Use the topic's serial
- # for all responses so they land on the topic the slicer subscribed
- # to.
- if not topic.startswith("device/") or "/request" not in topic:
- return
- client_serial = self._extract_serial_from_topic(topic) or self.serial
- if client_serial and client_serial != self._client_serials.get(client_id):
- if client_serial != self.serial:
- logger.info(
- "%sMQTT client publishing with serial %s (VP serial is %s) — adapting responses",
- self._log_prefix,
- client_serial,
- self.serial,
- )
- self._client_serials[client_id] = client_serial
- try:
- data = json.loads(message)
- except json.JSONDecodeError:
- return # Non-JSON payloads on request topic are safely ignored
- # Handle pushing command (status request)
- if "pushing" in data:
- pushing_data = data["pushing"]
- command = pushing_data.get("command", "")
- logger.info("MQTT pushing command: %s", command)
- if command == "pushall":
- # Slicer is requesting full status - send response
- logger.info("Sending status report in response to pushall")
- await self._send_status_report(writer, serial=client_serial)
- elif command == "start":
- # Slicer wants periodic status updates - send one now
- logger.info("Starting status push stream")
- await self._send_status_report(writer, serial=client_serial)
- # Handle info commands (get_version, etc.)
- if "info" in data:
- info_data = data["info"]
- command = info_data.get("command", "")
- sequence_id = info_data.get("sequence_id", "0")
- logger.info("MQTT info command: %s", command)
- if command == "get_version":
- await self._send_version_response(writer, sequence_id, serial=client_serial)
- # Handle print commands
- if "print" in data:
- print_data = data["print"]
- command = print_data.get("command", "")
- filename = print_data.get("subtask_name", "")
- sequence_id = print_data.get("sequence_id", "0")
- logger.info("MQTT print command: %s for %s", command, filename)
- if command == "project_file":
- # Respond with PREPARE status so slicer proceeds with FTP upload
- file_3mf = print_data.get("file", filename)
- await self._send_print_response(writer, sequence_id, file_3mf, serial=client_serial)
- if self.on_print_command:
- await self._notify_print_command(filename, print_data)
- except (IndexError, ValueError, OSError) as e:
- logger.debug("MQTT PUBLISH error: %s", e)
- async def _notify_print_command(self, filename: str, data: dict) -> None:
- """Notify callback of print command."""
- if self.on_print_command:
- try:
- result = self.on_print_command(filename, data)
- if asyncio.iscoroutine(result):
- await result
- except Exception as e:
- logger.error("Print command callback error: %s", e)
|