"""Bind/detect server for virtual printer discovery (ports 3000 + 3002). Bambu slicers (BambuStudio, OrcaSlicer) connect to a printer on port 3000 or 3002 to perform the "bind with access code" handshake before using MQTT/FTP. The port varies by slicer version, so we listen on both. Protocol: - Framing: 0xA5A5 + uint16_le(total_msg_size) + JSON payload + 0xA7A7 - Slicer sends: {"login":{"command":"detect","sequence_id":"20000"}} - Printer replies: {"login":{"bind":"free","command":"detect","connect":"lan", "dev_cap":1,"id":"","model":"","name":"", "sequence_id":,"version":""}} - Connection closes after one exchange. """ import asyncio import json import logging import struct logger = logging.getLogger(__name__) BIND_PORTS = [3000, 3002] FRAME_HEADER = b"\xa5\xa5" FRAME_TRAILER = b"\xa7\xa7" HEADER_SIZE = 4 # 2 bytes magic + 2 bytes length TRAILER_SIZE = 2 class BindServer: """Responds to slicer bind/detect requests on ports 3000 and 3002. In server mode, Bambuddy IS the printer — it responds with its own identity so the slicer can discover and bind to it. Different BambuStudio versions connect on different ports (3000 or 3002), so we listen on both to ensure compatibility. """ def __init__( self, serial: str, model: str, name: str, version: str = "01.00.00.00", bind_address: str = "0.0.0.0", # nosec B104 ): self.serial = serial self.model = model self.name = name self.version = version self.bind_address = bind_address self._servers: list[asyncio.Server] = [] self._running = False async def start(self) -> None: """Start the bind server on ports 3000 and 3002.""" if self._running: return self._running = True logger.info( "Starting bind server on ports %s (serial=%s, model=%s)", BIND_PORTS, self.serial, self.model, ) try: for port in BIND_PORTS: try: server = await asyncio.start_server( self._handle_client, self.bind_address, port, ) self._servers.append(server) logger.info("Bind server listening on %s:%s", self.bind_address, port) except OSError as e: if e.errno == 98: logger.warning("Bind server port %s already in use, skipping", port) elif e.errno == 13: logger.warning("Bind server: cannot bind to port %s (permission denied), skipping", port) else: logger.warning("Bind server: failed to bind port %s: %s", port, e) if not self._servers: logger.error("Bind server: could not bind to any port") return # Serve all successfully bound ports await asyncio.gather(*(s.serve_forever() for s in self._servers)) except asyncio.CancelledError: logger.debug("Bind server task cancelled") except Exception as e: logger.error("Bind server error: %s", e) finally: await self.stop() async def stop(self) -> None: """Stop the bind server.""" logger.info("Stopping bind server") self._running = False for server in self._servers: try: server.close() await server.wait_closed() except OSError as e: logger.debug("Error closing bind server: %s", e) self._servers = [] async def _handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, ) -> None: """Handle a single bind/detect request from a slicer.""" peername = writer.get_extra_info("peername") client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown" logger.info("Bind server: client connected from %s", client_id) try: # Read the framed message (timeout after 10s) data = await asyncio.wait_for(reader.read(4096), timeout=10.0) if not data: return # Parse the request request = self._parse_frame(data) if request is None: logger.warning("Bind server: invalid frame from %s", client_id) return logger.info("Bind server: received from %s: %s", client_id, request) # Check if this is a detect command login = request.get("login", {}) if not isinstance(login, dict) or login.get("command") != "detect": logger.warning("Bind server: unexpected command from %s: %s", client_id, request) return # Build response response = { "login": { "bind": "free", "command": "detect", "connect": "lan", "dev_cap": 1, "id": self.serial, "model": self.model, "name": self.name, "sequence_id": 3021, "version": self.version, } } frame = self._build_frame(response) writer.write(frame) await writer.drain() logger.info("Bind server: sent detect response to %s (serial=%s)", client_id, self.serial) except TimeoutError: logger.debug("Bind server: timeout waiting for data from %s", client_id) except Exception as e: logger.error("Bind server: error handling %s: %s", client_id, e) finally: try: writer.close() await writer.wait_closed() except OSError: pass logger.debug("Bind server: client %s disconnected", client_id) def _parse_frame(self, data: bytes) -> dict | None: """Parse a framed message: 0xA5A5 + len(u16le) + JSON + 0xA7A7.""" if len(data) < HEADER_SIZE + TRAILER_SIZE: return None if data[:2] != FRAME_HEADER: return None if data[-2:] != FRAME_TRAILER: return None # Length field is total message size (header + json + trailer) total_len = struct.unpack_from(" bytes: """Build a framed message: 0xA5A5 + len(u16le) + JSON + 0xA7A7.""" json_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") total_len = HEADER_SIZE + len(json_bytes) + TRAILER_SIZE header = FRAME_HEADER + struct.pack("