| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042 |
- """MQTT broker for virtual printer.
- Implements an MQTT broker that accepts connections from slicers,
- authenticates with the configured access code, and logs print commands.
- """
- import asyncio
- import json
- import logging
- import ssl
- from collections.abc import Callable
- from pathlib import Path
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from backend.app.services.virtual_printer.mqtt_bridge import MQTTBridge
- logger = logging.getLogger(__name__)
- # Default MQTT port for Bambu printers (MQTT over TLS)
- MQTT_PORT = 8883
- # Model code → product_name for version response (must match what slicer expects)
- MODEL_PRODUCT_NAMES = {
- "BL-P001": "X1 Carbon",
- "BL-P002": "X1",
- "C13": "X1E",
- "N6": "X2D",
- "C11": "P1P",
- "C12": "P1S",
- "N7": "P2S",
- "N2S": "A1",
- "N1": "A1 mini",
- "O1D": "H2D",
- "O1C": "H2C",
- "O1C2": "H2C",
- "O1S": "H2S",
- }
- class VirtualPrinterMQTTServer:
- """MQTT broker that accepts connections from slicers.
- This is a minimal MQTT broker implementation that:
- - Accepts TLS connections on port 8883
- - Authenticates with username 'bblp' and the configured access code
- - Receives print commands on device/{serial}/request
- - Can publish status on device/{serial}/report
- """
- def __init__(
- self,
- serial: str,
- access_code: str,
- cert_path: Path,
- key_path: Path,
- port: int = MQTT_PORT,
- on_print_command: Callable[[str, dict], None] | None = None,
- ):
- """Initialize the MQTT server.
- Args:
- serial: Virtual printer serial number
- access_code: Password for authentication
- cert_path: Path to TLS certificate
- key_path: Path to TLS private key
- port: Port to listen on (default 8883)
- on_print_command: Callback when print command received (filename, data)
- """
- self.serial = serial
- self.access_code = access_code
- self.cert_path = cert_path
- self.key_path = key_path
- self.port = port
- self.on_print_command = on_print_command
- self._running = False
- self._broker = None
- self._broker_task = None
- async def start(self) -> None:
- """Start the MQTT broker."""
- if self._running:
- return
- # Try to import amqtt
- try:
- from amqtt.broker import Broker
- except ImportError:
- logger.error("amqtt not installed. Run: pip install amqtt")
- return
- logger.info("Starting virtual printer MQTT broker on port %s", self.port)
- # Build broker configuration
- config = {
- "listeners": {
- "default": {
- "type": "tcp",
- "bind": f"0.0.0.0:{self.port}",
- "ssl": "on",
- "certfile": str(self.cert_path),
- "keyfile": str(self.key_path),
- },
- },
- "auth": {
- "allow-anonymous": False,
- "plugins": ["auth_custom"],
- },
- "topic-check": {
- "enabled": False, # Allow any topic
- },
- }
- try:
- self._running = True
- # Create and start broker
- self._broker = Broker(config)
- # Register custom auth plugin
- self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate
- # Start the broker
- await self._broker.start()
- logger.info("MQTT broker started on port %s", self.port)
- # Keep running
- while self._running:
- await asyncio.sleep(1)
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("MQTT port %s is already in use", self.port)
- else:
- logger.error("MQTT broker error: %s", e)
- except asyncio.CancelledError:
- logger.debug("MQTT broker task cancelled")
- except Exception as e:
- logger.error("MQTT broker error: %s", e)
- finally:
- await self.stop()
- async def _authenticate(self, session) -> bool:
- """Authenticate MQTT connection.
- Args:
- session: MQTT session with username/password
- Returns:
- True if authentication successful
- """
- username = getattr(session, "username", None)
- password = getattr(session, "password", None)
- # Bambu slicers use 'bblp' as username and access code as password
- if username == "bblp" and password == self.access_code:
- logger.debug("MQTT client authenticated from %s", session.remote_address)
- return True
- logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address)
- return False
- async def stop(self) -> None:
- """Stop the MQTT broker."""
- logger.info("Stopping MQTT broker")
- self._running = False
- if self._broker:
- try:
- await self._broker.shutdown()
- except OSError as e:
- logger.debug("Error shutting down MQTT broker: %s", e)
- self._broker = None
- class SimpleMQTTServer:
- """Simplified MQTT server using raw sockets.
- This is a fallback implementation that handles basic MQTT protocol
- without requiring the amqtt library. It's less feature-complete but
- more lightweight.
- """
- def __init__(
- self,
- serial: str,
- access_code: str,
- cert_path: Path,
- key_path: Path,
- port: int = MQTT_PORT,
- on_print_command: Callable[[str, dict], None] | None = None,
- model: str = "",
- bind_address: str = "0.0.0.0", # nosec B104
- vp_name: str = "",
- ):
- self.serial = serial
- self.access_code = access_code
- self.model = model
- self.cert_path = cert_path
- self.key_path = key_path
- self.port = port
- self.on_print_command = on_print_command
- self.bind_address = bind_address
- self.vp_name = vp_name
- self._log_prefix = f"[{vp_name}] " if vp_name else ""
- self._running = False
- self._server = None
- self._clients: dict[str, asyncio.StreamWriter] = {}
- # Per-client "effective serial" — the serial the slicer actually uses in
- # device/{serial}/report|request topics. Populated from the first
- # SUBSCRIBE/PUBLISH we see on a connection. This lets the VP respond on
- # the topic the slicer is listening on even when it disagrees with
- # self.serial (e.g. a stale Orca config that was bound to an older VP
- # serial, or a printer entry that was re-pointed at the VP IP without
- # updating the serial).
- self._client_serials: dict[str, str] = {}
- self._status_push_task: asyncio.Task | None = None
- self._sequence_id = 0
- # Dynamic state for status reports
- self._gcode_state = "IDLE"
- self._current_file = ""
- self._prepare_percent = "0"
- # MQTT bridge for non-proxy modes — set by VirtualPrinterInstance after start().
- # When the bridge is_active, real printer pushes are fanned out to slicers and
- # the synthetic 1s push is suspended. When the target printer goes offline the
- # synthetic fallback resumes automatically.
- self._bridge: MQTTBridge | None = None
- async def start(self) -> None:
- """Start the MQTT server."""
- if self._running:
- return
- logger.info("Starting simple MQTT server on port %s", self.port)
- # Create SSL context with Bambu-compatible settings
- ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
- # Match Bambu printer behavior - accept any client
- ssl_context.verify_mode = ssl.CERT_NONE
- # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3)
- ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
- # Disable hostname checking
- ssl_context.check_hostname = False
- # Log certificate info
- import subprocess
- try:
- result = subprocess.run(
- ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"],
- capture_output=True,
- text=True,
- timeout=5,
- )
- logger.info("MQTT SSL cert info: %s", result.stdout.strip())
- except (OSError, subprocess.SubprocessError):
- pass # Certificate info is for debug logging only; not critical
- logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path)
- try:
- self._running = True
- # Wrapper to log ALL connection attempts including SSL errors
- async def connection_handler(reader, writer):
- try:
- addr = writer.get_extra_info("peername")
- ssl_obj = writer.get_extra_info("ssl_object")
- if ssl_obj:
- logger.info(
- f"{self._log_prefix}MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}"
- )
- else:
- logger.info("%sMQTT connection from %s (no TLS?)", self._log_prefix, addr)
- await self._handle_client(reader, writer)
- except ssl.SSLError as e:
- logger.error("MQTT SSL error: %s", e)
- except Exception as e:
- logger.error("MQTT connection handler error: %s", e)
- self._server = await asyncio.start_server(
- connection_handler,
- self.bind_address,
- self.port,
- ssl=ssl_context,
- )
- logger.info("Simple MQTT server listening on port %s", self.port)
- # Start periodic status push task
- self._status_push_task = asyncio.create_task(self._periodic_status_push())
- async with self._server:
- await self._server.serve_forever()
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("MQTT port %s is already in use", self.port)
- else:
- logger.error("MQTT server error: %s", e)
- except asyncio.CancelledError:
- logger.debug("MQTT server task cancelled")
- except Exception as e:
- logger.error("MQTT server error: %s", e)
- finally:
- await self.stop()
- async def stop(self) -> None:
- """Stop the MQTT server."""
- logger.info("Stopping simple MQTT server")
- self._running = False
- # Stop periodic status push
- if self._status_push_task:
- self._status_push_task.cancel()
- try:
- await self._status_push_task
- except asyncio.CancelledError:
- pass # Expected when stopping the periodic status push task
- self._status_push_task = None
- # Close all client connections (iterate over copy to avoid modification during iteration)
- for _client_id, writer in list(self._clients.items()):
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort client connection cleanup; client may have disconnected
- self._clients.clear()
- self._client_serials.clear()
- if self._server:
- try:
- self._server.close()
- await self._server.wait_closed()
- except OSError:
- pass # Best-effort server shutdown; port may already be released
- self._server = None
- @staticmethod
- def _extract_serial_from_topic(topic: str) -> str | None:
- """Pull the serial out of a `device/{serial}/report|request` topic.
- Returns None if the topic doesn't match that shape — callers fall back
- to self.serial in that case.
- """
- if not topic.startswith("device/"):
- return None
- rest = topic[len("device/") :]
- # Expect "{serial}/report" or "{serial}/request" (possibly with suffixes).
- slash = rest.find("/")
- if slash <= 0:
- return None
- return rest[:slash]
- def set_bridge(self, bridge: "MQTTBridge | None") -> None:
- """Attach (or detach) the MQTT bridge that mirrors the target printer."""
- self._bridge = bridge
- async def _periodic_status_push(self) -> None:
- """Send periodic status updates to all connected clients (1 Hz, exact pre-bridge behaviour)."""
- logger.info("Starting periodic status push task")
- while self._running:
- try:
- await asyncio.sleep(1) # Push every 1 second like real printers
- disconnected = []
- for client_id, writer in list(self._clients.items()):
- try:
- if writer.is_closing():
- disconnected.append(client_id)
- continue
- serial = self._client_serials.get(client_id, self.serial)
- await self._send_status_report(writer, serial=serial)
- except OSError as e:
- logger.debug("Failed to push status to %s: %s", client_id, e)
- disconnected.append(client_id)
- # Remove disconnected clients
- for client_id in disconnected:
- self._clients.pop(client_id, None)
- self._client_serials.pop(client_id, None)
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error("Periodic status push error: %s", e)
- logger.info("Periodic status push task stopped")
- async def push_raw_to_clients(self, topic: str, payload: bytes) -> None:
- """Publish a pre-serialized MQTT payload on `topic` to every connected slicer.
- Called by MQTTBridge from the asyncio loop (scheduled via
- run_coroutine_threadsafe from paho's network thread).
- """
- topic_bytes = topic.encode("utf-8")
- # MQTT remaining-length: 2-byte topic length prefix + topic + message body.
- remaining = 2 + len(topic_bytes) + len(payload)
- packet = bytearray([0x30]) # PUBLISH, QoS 0
- while True:
- byte = remaining % 128
- remaining //= 128
- if remaining > 0:
- byte |= 0x80
- packet.append(byte)
- if remaining == 0:
- break
- packet.extend([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
- packet.extend(topic_bytes)
- packet.extend(payload)
- frame = bytes(packet)
- disconnected = []
- for client_id, writer in list(self._clients.items()):
- try:
- if writer.is_closing():
- disconnected.append(client_id)
- continue
- writer.write(frame)
- try:
- await asyncio.wait_for(writer.drain(), timeout=5)
- except TimeoutError:
- logger.debug("MQTT drain timeout pushing bridge frame to %s", client_id)
- except OSError as e:
- logger.debug("Failed to push bridge frame to %s: %s", client_id, e)
- disconnected.append(client_id)
- for client_id in disconnected:
- self._clients.pop(client_id, None)
- self._client_serials.pop(client_id, None)
- async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
- """Handle an MQTT client connection."""
- addr = writer.get_extra_info("peername")
- client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
- logger.info("%sMQTT client connected: %s", self._log_prefix, client_id)
- authenticated = False
- try:
- while self._running:
- # Read MQTT fixed header
- try:
- header = await asyncio.wait_for(reader.read(1), timeout=60)
- except TimeoutError:
- break
- if not header:
- break
- packet_type = (header[0] & 0xF0) >> 4
- # Read remaining length
- remaining_length = await self._read_remaining_length(reader)
- if remaining_length is None:
- break
- # Read payload
- payload = await reader.read(remaining_length) if remaining_length > 0 else b""
- # Handle packet types
- if packet_type == 1: # CONNECT
- authenticated = await self._handle_connect(payload, writer)
- if not authenticated:
- break
- # Register client for periodic status pushes; start with
- # self.serial as the fallback until we learn the slicer's
- # preferred serial from the first SUBSCRIBE/PUBLISH.
- self._clients[client_id] = writer
- self._client_serials[client_id] = self.serial
- elif packet_type == 3: # PUBLISH
- if authenticated:
- await self._handle_publish(header[0], payload, writer, client_id)
- elif packet_type == 8: # SUBSCRIBE
- if authenticated:
- await self._handle_subscribe(payload, writer, client_id)
- elif packet_type == 12: # PINGREQ
- # Send PINGRESP
- writer.write(bytes([0xD0, 0x00]))
- await writer.drain()
- elif packet_type == 14: # DISCONNECT
- break
- except asyncio.CancelledError:
- pass # Expected when server is shutting down and cancels client tasks
- except Exception as e:
- logger.debug("MQTT client error: %s", e)
- finally:
- logger.debug("MQTT client disconnected: %s", client_id)
- self._clients.pop(client_id, None)
- self._client_serials.pop(client_id, None)
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort socket cleanup on client disconnect
- async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None:
- """Read MQTT remaining length (variable byte integer)."""
- multiplier = 1
- value = 0
- for _ in range(4):
- try:
- byte = await reader.read(1)
- if not byte:
- return None
- encoded = byte[0]
- value += (encoded & 127) * multiplier
- if (encoded & 128) == 0:
- return value
- multiplier *= 128
- except OSError:
- return None
- return None
- async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> bool:
- """Handle MQTT CONNECT packet.
- Returns True if authentication successful.
- """
- try:
- # Parse CONNECT packet
- # Skip protocol name length and name
- idx = 0
- proto_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2 + proto_len
- # Skip protocol level and connect flags
- # connect_flags = payload[idx + 1]
- idx += 2
- # Skip keepalive
- idx += 2
- # Read client ID
- client_id_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- # client_id = payload[idx : idx + client_id_len].decode("utf-8")
- idx += client_id_len
- # Read username
- username_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- username = payload[idx : idx + username_len].decode("utf-8")
- idx += username_len
- # Read password
- password_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- password = payload[idx : idx + password_len].decode("utf-8")
- # Authenticate
- if username == "bblp" and password == self.access_code:
- # Send CONNACK with success
- writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
- await writer.drain()
- logger.info("%sMQTT client authenticated successfully", self._log_prefix)
- # Send immediate status report after auth - slicer expects this
- await self._send_status_report(writer)
- return True
- else:
- # Send CONNACK with auth failure
- writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
- await writer.drain()
- logger.warning("%sMQTT auth failed for user '%s' (access code mismatch)", self._log_prefix, username)
- return False
- except (IndexError, ValueError) as e:
- logger.debug("MQTT CONNECT parse error: %s", e)
- # Send CONNACK with error
- writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error
- await writer.drain()
- return False
- async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
- """Handle MQTT SUBSCRIBE packet."""
- try:
- # Parse packet ID
- packet_id = (payload[0] << 8) | payload[1]
- # Parse topic filters (just acknowledge them)
- idx = 2
- granted_qos = []
- learned_serial: str | None = None
- while idx < len(payload):
- topic_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- topic = payload[idx : idx + topic_len].decode("utf-8")
- idx += topic_len
- requested_qos = payload[idx]
- idx += 1
- logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
- granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
- # Remember the serial the slicer is listening on so status/version
- # responses go to a topic it actually subscribed to.
- if learned_serial is None:
- extracted = self._extract_serial_from_topic(topic)
- if extracted:
- learned_serial = extracted
- if learned_serial and learned_serial != self._client_serials.get(client_id):
- if learned_serial != self.serial:
- logger.info(
- "%sMQTT client subscribed with serial %s (VP serial is %s) — adapting responses",
- self._log_prefix,
- learned_serial,
- self.serial,
- )
- self._client_serials[client_id] = learned_serial
- # Send SUBACK
- suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
- suback += bytes(granted_qos)
- writer.write(suback)
- await writer.drain()
- # Send initial status report after subscribe on the client's subscribed topic
- await self._send_status_report(writer, serial=self._client_serials.get(client_id, self.serial))
- except (IndexError, ValueError, OSError) as e:
- logger.debug("MQTT SUBSCRIBE error: %s", e)
- async def _send_status_report(self, writer: asyncio.StreamWriter, serial: str | None = None) -> None:
- """Send a status report to the slicer after connection.
- When a bridge is active and has cached the real printer's latest
- push_status, send a copy of the real push with only the upload-state-
- machine fields we own (gcode_state, gcode_file, prepare_percent,
- subtask_name) overridden. BambuStudio's Send pre-flight checks the
- push_status shape against what it expects from the printer model, and
- the synthetic stub introduced fields the real H2D doesn't have (storage,
- the wrong chamber_temper shape, etc.) which trip the check.
- """
- try:
- self._sequence_id += 1
- cached = self._bridge.get_latest_print_state() if self._bridge is not None else None
- if isinstance(cached, dict):
- # Real-printer-shaped response. Copy the cache, then replace the
- # protocol / upload-state fields with values under our control.
- print_block = dict(cached)
- print_block["sequence_id"] = str(self._sequence_id)
- print_block["command"] = "push_status"
- print_block["msg"] = 0
- print_block["gcode_state"] = self._gcode_state
- print_block["gcode_file"] = self._current_file
- print_block["gcode_file_prepare_percent"] = self._prepare_percent
- if self._current_file:
- print_block["subtask_name"] = self._current_file.replace(".3mf", "")
- else:
- # Don't override real subtask_name with empty if no upload pending.
- print_block.setdefault("subtask_name", "")
- # Storage-availability indicators the slicer's "Send" pre-flight reads
- # (#1228). P1S/A1-class firmware doesn't always include these in
- # push_status (no SD card inserted, older field shapes), and BambuStudio
- # rejects the send pre-flight with the generic "storage needs to be
- # inserted before send to printer" error before even attempting FTP.
- # For VP usage the slicer uploads via FTPS to Bambuddy's filesystem —
- # the printer's actual SD/storage state is irrelevant on that path.
- # Force "available" indicators so the pre-flight passes regardless of
- # what the real printer reports. Restores the 0.2.3.2 synthetic-stub
- # behaviour for these fields without losing the live AMS / k-profile /
- # camera mirror cached-as-base provides.
- print_block["home_flag"] = print_block.get("home_flag", 0) | 0x100 # bit 8 = HAS_SDCARD_NORMAL
- print_block["sdcard"] = True
- print_block.setdefault("storage", {"free": 1_000_000_000, "total": 32_000_000_000})
- status = {"print": print_block}
- await self._publish_to_report(writer, status, serial or self.serial)
- return
- # No bridge / no cache yet — fall back to the synthetic stub.
- status = {
- "print": {
- "sequence_id": str(self._sequence_id),
- "command": "push_status",
- "msg": 0,
- "gcode_state": self._gcode_state,
- "gcode_file": self._current_file,
- "gcode_file_prepare_percent": self._prepare_percent,
- "subtask_name": self._current_file.replace(".3mf", "") if self._current_file else "",
- "mc_print_stage": "",
- "mc_percent": 0,
- "mc_remaining_time": 0,
- "wifi_signal": "-44dBm",
- "print_error": 0,
- "print_type": "",
- "bed_temper": 25.0,
- "bed_target_temper": 0.0,
- "nozzle_temper": 25.0,
- "nozzle_target_temper": 0.0,
- "chamber_temper": 25.0,
- "cooling_fan_speed": "0",
- "big_fan1_speed": "0",
- "big_fan2_speed": "0",
- "heatbreak_fan_speed": "0",
- "spd_lvl": 1,
- "spd_mag": 100,
- "stg": [],
- "stg_cur": 0,
- "layer_num": 0,
- "total_layer_num": 0,
- "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL)
- "hw_switch_state": 0,
- "online": {"ahb": False, "rfid": False, "version": 7},
- "ams_status": 0,
- "sdcard": True,
- "storage": {"free": 1000000000, "total": 32000000000},
- "upgrade_state": {
- "sequence_id": 0,
- "progress": "",
- "status": "",
- "consistency_request": False,
- "dis_state": 0,
- "err_code": 0,
- "force_upgrade": False,
- "message": "",
- "module": "",
- "new_version_state": 2,
- "new_ver_list": [],
- "ota_new_version_number": "",
- "ahb_new_version_number": "",
- },
- "ipcam": {
- "ipcam_dev": "1",
- "ipcam_record": "enable",
- "timelapse": "disable",
- "resolution": "1080p",
- "mode_bits": 0,
- },
- "xcam": {
- "allow_skip_parts": False,
- "buildplate_marker_detector": True,
- "first_layer_inspector": True,
- "halt_print_sensitivity": "medium",
- "print_halt": True,
- "printing_monitor": True,
- "spaghetti_detector": True,
- },
- "lights_report": [{"node": "chamber_light", "mode": "on"}],
- "nozzle_diameter": "0.4",
- "nozzle_type": "hardened_steel",
- }
- }
- await self._publish_to_report(writer, status, serial or self.serial)
- except OSError as e:
- logger.error("Failed to send status report: %s", e)
- async def _send_version_response(
- self, writer: asyncio.StreamWriter, sequence_id: str, serial: str | None = None
- ) -> None:
- """Send version info response to the slicer."""
- try:
- product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
- # The serial is embedded inside the module[].sn fields *and* used as
- # the report topic. Use the client's effective serial so the slicer
- # sees internal/topic consistency even when it differs from self.serial.
- serial = serial or self.serial
- # Build version response matching OrcaSlicer expectations
- # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
- version_info = {
- "info": {
- "command": "get_version",
- "sequence_id": sequence_id,
- "module": [
- {
- "name": "ota",
- "product_name": product_name,
- "sw_ver": "01.07.00.00",
- "sw_new_ver": "",
- "hw_ver": "OTA",
- "sn": serial,
- "flag": 0,
- },
- {
- "name": "esp32",
- "product_name": product_name,
- "sw_ver": "01.07.22.25",
- "sw_new_ver": "",
- "hw_ver": "AP05",
- "sn": serial,
- "flag": 0,
- },
- {
- "name": "rv1126",
- "product_name": product_name,
- "sw_ver": "00.00.27.38",
- "sw_new_ver": "",
- "hw_ver": "AP05",
- "sn": serial,
- "flag": 0,
- },
- {
- "name": "th",
- "product_name": product_name,
- "sw_ver": "00.00.04.00",
- "sw_new_ver": "",
- "hw_ver": "TH07",
- "sn": serial,
- "flag": 0,
- },
- {
- "name": "mc",
- "product_name": product_name,
- "sw_ver": "00.00.10.00",
- "sw_new_ver": "",
- "hw_ver": "MC07",
- "sn": serial,
- "flag": 0,
- },
- ],
- }
- }
- # Overlay real version modules from the bridge cache when available
- # (specifically the AMS modules ams/0, n3f/0, n3s/128 etc. that
- # BambuStudio's Prepare tab uses to identify AMS hardware — without
- # them every AMS unit shows as "unknown" in the Prepare panel).
- if self._bridge is not None:
- cached_modules = self._bridge.get_latest_version_modules()
- if isinstance(cached_modules, list) and cached_modules:
- version_info["info"]["module"] = cached_modules
- await self._publish_to_report(writer, version_info, serial)
- logger.info("Sent version response (product_name=%s)", product_name)
- except OSError as e:
- logger.error("Failed to send version response: %s", e)
- def set_gcode_state(self, state: str, filename: str = "", prepare_percent: str = "0") -> None:
- """Update the gcode state reported to connected slicers.
- Called by the manager to reflect FTP upload progress/completion.
- """
- self._gcode_state = state
- self._current_file = filename
- self._prepare_percent = prepare_percent
- async def _publish_to_report(self, writer: asyncio.StreamWriter, payload: dict, serial: str = "") -> None:
- """Publish a message on the device report topic.
- Real Bambu printers wire-format push_status JSON with 4-space indentation
- (32254 bytes for an idle H2D push vs 14268 bytes compact). BambuStudio's
- Send pre-flight rejects compact JSON — without matching the on-wire
- format the slicer never proceeds to FTP upload.
- """
- topic = f"device/{serial or self.serial}/report"
- message = json.dumps(payload, indent=4)
- topic_bytes = topic.encode("utf-8")
- message_bytes = message.encode("utf-8")
- remaining = 2 + len(topic_bytes) + len(message_bytes)
- packet = bytes([0x30]) # PUBLISH, QoS 0
- while remaining > 0:
- byte = remaining % 128
- remaining //= 128
- if remaining > 0:
- byte |= 0x80
- packet += bytes([byte])
- packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
- packet += topic_bytes
- packet += message_bytes
- writer.write(packet)
- # Timeout the drain to prevent blocking the event loop if the
- # MQTT client stops reading (e.g. slicer busy with FTP upload).
- try:
- await asyncio.wait_for(writer.drain(), timeout=5)
- except TimeoutError:
- logger.debug("MQTT drain timeout for %s — client may be busy", topic)
- async def _send_print_response(
- self, writer: asyncio.StreamWriter, sequence_id: str, filename: str, serial: str | None = None
- ) -> None:
- """Send project_file acknowledgment matching real Bambu printer behavior."""
- # Update state so periodic status pushes reflect preparation
- self._gcode_state = "PREPARE"
- self._current_file = filename
- self._prepare_percent = "0"
- try:
- # Send command acknowledgment — slicer expects to see
- # command: "project_file" echoed back before starting FTP upload
- subtask_name = filename.replace(".3mf", "") if filename else ""
- response = {
- "print": {
- "command": "project_file",
- "sequence_id": sequence_id,
- "param": "Metadata/plate_1.gcode",
- "subtask_name": subtask_name,
- "gcode_state": "PREPARE",
- "gcode_file": filename,
- "gcode_file_prepare_percent": "0",
- "result": "SUCCESS",
- "msg": 0,
- }
- }
- await self._publish_to_report(writer, response, serial or self.serial)
- logger.info("Sent project_file acknowledgment for %s", filename)
- except OSError as e:
- logger.error("Failed to send print response: %s", e)
- async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
- """Handle MQTT PUBLISH packet."""
- try:
- # Parse topic
- idx = 0
- topic_len = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- topic = payload[idx : idx + topic_len].decode("utf-8")
- idx += topic_len
- # Check for packet ID (QoS > 0)
- qos = (header & 0x06) >> 1
- if qos > 0:
- # packet_id = (payload[idx] << 8) | payload[idx + 1]
- idx += 2
- # Parse message
- message = payload[idx:].decode("utf-8")
- logger.info("MQTT publish to %s: %s...", topic, message[:100])
- # Only handle publishes on *some* device/.../request topic. The
- # serial is taken from the topic rather than compared against
- # self.serial: the client is already authenticated via the access
- # code, and Orca/BambuStudio may have a cached serial that differs
- # from the VP's computed self.serial (#927). Use the topic's serial
- # for all responses so they land on the topic the slicer subscribed
- # to.
- if not topic.startswith("device/") or "/request" not in topic:
- return
- client_serial = self._extract_serial_from_topic(topic) or self.serial
- if client_serial and client_serial != self._client_serials.get(client_id):
- if client_serial != self.serial:
- logger.info(
- "%sMQTT client publishing with serial %s (VP serial is %s) — adapting responses",
- self._log_prefix,
- client_serial,
- self.serial,
- )
- self._client_serials[client_id] = client_serial
- try:
- # Some slicer builds (observed with OrcaSlicer on Linux, #927)
- # include the C-string null terminator in the MQTT payload
- # length, so the decoded message ends with \x00. Real brokers
- # pass the bytes through; strict json.loads raises "Extra data"
- # and every pushall/get_version/project_file silently dropped.
- data = json.loads(message.rstrip("\x00 \r\n\t"))
- except json.JSONDecodeError as e:
- logger.debug(
- "MQTT publish JSON decode failed: %s (payload=%r)",
- e,
- message[:200],
- )
- return
- # The synthetic flow below is the original (pre-bridge) behaviour and is
- # what the proven-working FTP "Send" depends on. Do NOT replace any
- # synthetic response with a forward — only ADD forwarding alongside,
- # at the bottom, for commands the synthetic flow doesn't handle
- # (AMS write / xcam / system / etc., which need to actually reach
- # the real printer).
- handled_locally = False
- # Handle pushing command (status request)
- if "pushing" in data:
- pushing_data = data["pushing"]
- command = pushing_data.get("command", "")
- logger.info("MQTT pushing command: %s", command)
- if command == "pushall":
- logger.info("Sending status report in response to pushall")
- await self._send_status_report(writer, serial=client_serial)
- handled_locally = True
- elif command == "start":
- logger.info("Starting status push stream")
- await self._send_status_report(writer, serial=client_serial)
- handled_locally = True
- # Handle info commands (get_version, etc.)
- if "info" in data:
- info_data = data["info"]
- command = info_data.get("command", "")
- sequence_id = info_data.get("sequence_id", "0")
- logger.info("MQTT info command: %s", command)
- if command == "get_version":
- await self._send_version_response(writer, sequence_id, serial=client_serial)
- handled_locally = True
- # Handle print commands
- if "print" in data:
- print_data = data["print"]
- command = print_data.get("command", "")
- filename = print_data.get("subtask_name", "")
- sequence_id = print_data.get("sequence_id", "0")
- logger.info("MQTT print command: %s for %s", command, filename)
- if command in ("project_file", "gcode_file"):
- # File lives on Bambuddy, not the printer — synthetic only.
- file_3mf = print_data.get("file", filename)
- await self._send_print_response(writer, sequence_id, file_3mf, serial=client_serial)
- if self.on_print_command:
- await self._notify_print_command(filename, print_data)
- handled_locally = True
- # Forward anything the synthetic flow didn't handle to the real
- # printer. AMS load / dry / xcam / system / extrusion_cali_get etc.
- if not handled_locally and self._bridge is not None and self._bridge.is_active:
- self._bridge.forward_to_printer(data)
- except (IndexError, ValueError, OSError) as e:
- logger.debug("MQTT PUBLISH error: %s", e)
- async def _notify_print_command(self, filename: str, data: dict) -> None:
- """Notify callback of print command."""
- if self.on_print_command:
- try:
- result = self.on_print_command(filename, data)
- if asyncio.iscoroutine(result):
- await result
- except Exception as e:
- logger.error("Print command callback error: %s", e)
|