| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- """SSDP discovery responder for virtual printer.
- Responds to M-SEARCH requests from slicers and sends periodic NOTIFY
- announcements so the virtual printer appears as a discoverable Bambu printer.
- """
- import asyncio
- import logging
- import socket
- import struct
- from datetime import datetime
- logger = logging.getLogger(__name__)
- # SSDP multicast address - Bambu uses port 2021
- SSDP_ADDR = "239.255.255.250"
- SSDP_PORT = 2021
- # Bambu service target
- BAMBU_SEARCH_TARGET = "urn:bambulab-com:device:3dprinter:1"
- class VirtualPrinterSSDPServer:
- """SSDP server that responds to discovery requests as a virtual Bambu printer."""
- def __init__(
- self,
- name: str = "Bambuddy",
- serial: str = "00M09A391800001", # X1C serial format for compatibility
- model: str = "BL-P001", # X1C model code for best compatibility
- ):
- """Initialize the SSDP server.
- Args:
- name: Display name shown in slicer discovery
- serial: Unique serial number for this virtual printer (must match cert CN)
- model: Model code (BL-P001=X1C, C11=P1S, O1D=H2D)
- """
- self.name = name
- self.serial = serial
- self.model = model
- self._running = False
- self._socket: socket.socket | None = None
- self._local_ip: str | None = None
- def _get_local_ip(self) -> str:
- """Get the local IP address to advertise."""
- if self._local_ip:
- return self._local_ip
- # Try to determine local IP by connecting to a public address
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(("8.8.8.8", 80))
- ip = s.getsockname()[0]
- s.close()
- self._local_ip = ip
- return ip
- except Exception:
- return "127.0.0.1"
- def _build_notify_message(self) -> bytes:
- """Build SSDP NOTIFY message for periodic announcements."""
- ip = self._get_local_ip()
- # Based on: https://gist.github.com/Alex-Schaefer/72a9e2491a42da2ef99fb87601955cc3
- # Key: DevBind.bambu.com: free - tells slicer printer is NOT cloud-bound
- message = (
- "NOTIFY * HTTP/1.1\r\n"
- f"HOST: {SSDP_ADDR}:{SSDP_PORT}\r\n"
- "Server: Buildroot/2018.02-rc3 UPnP/1.0 ssdpd/1.8\r\n"
- "Cache-Control: max-age=1800\r\n"
- f"Location: {ip}\r\n"
- f"NT: {BAMBU_SEARCH_TARGET}\r\n"
- "NTS: ssdp:alive\r\n"
- "EXT:\r\n"
- f"USN: {self.serial}\r\n"
- f"DevModel.bambu.com: {self.model}\r\n"
- f"DevName.bambu.com: {self.name}\r\n"
- "DevSignal.bambu.com: -44\r\n"
- "DevConnect.bambu.com: lan\r\n"
- "DevBind.bambu.com: free\r\n"
- "Devseclink.bambu.com: secure\r\n"
- "DevVersion.bambu.com: 01.07.00.00\r\n"
- "\r\n"
- )
- return message.encode()
- def _build_response_message(self) -> bytes:
- """Build SSDP response message for M-SEARCH requests."""
- ip = self._get_local_ip()
- # Based on: https://gist.github.com/Alex-Schaefer/72a9e2491a42da2ef99fb87601955cc3
- # Key: DevBind.bambu.com: free - tells slicer printer is NOT cloud-bound
- # Added: Devseclink, DevVersion, DevCap for better compatibility
- message = (
- "HTTP/1.1 200 OK\r\n"
- "Server: Buildroot/2018.02-rc3 UPnP/1.0 ssdpd/1.8\r\n"
- f"Date: {datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')}\r\n"
- f"Location: {ip}\r\n"
- f"ST: {BAMBU_SEARCH_TARGET}\r\n"
- "EXT:\r\n"
- f"USN: {self.serial}\r\n"
- "Cache-Control: max-age=1800\r\n"
- f"DevModel.bambu.com: {self.model}\r\n"
- f"DevName.bambu.com: {self.name}\r\n"
- "DevSignal.bambu.com: -44\r\n"
- "DevConnect.bambu.com: lan\r\n"
- "DevBind.bambu.com: free\r\n"
- "Devseclink.bambu.com: secure\r\n"
- "DevVersion.bambu.com: 01.07.00.00\r\n"
- "\r\n"
- )
- return message.encode()
- async def start(self) -> None:
- """Start the SSDP server."""
- if self._running:
- return
- logger.info(f"Starting virtual printer SSDP server: {self.name} ({self.serial})")
- self._running = True
- try:
- # Create UDP socket
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- # Try to set SO_REUSEPORT if available
- try:
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- except (AttributeError, OSError):
- pass
- # Set non-blocking mode
- self._socket.setblocking(False)
- # Bind to SSDP port
- self._socket.bind(("", SSDP_PORT))
- # Join multicast group
- mreq = struct.pack("4sl", socket.inet_aton(SSDP_ADDR), socket.INADDR_ANY)
- self._socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
- # Enable broadcast
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
- # Set multicast TTL
- self._socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
- local_ip = self._get_local_ip()
- logger.info(f"SSDP server listening on port {SSDP_PORT}, advertising IP: {local_ip}")
- logger.info(f"Virtual printer: {self.name} ({self.serial}) model={self.model}")
- # Send initial NOTIFY
- await self._send_notify()
- logger.info("Sent initial SSDP NOTIFY announcement")
- # Run receive and announce loops
- last_notify = asyncio.get_event_loop().time()
- notify_interval = 30.0 # Send NOTIFY every 30 seconds
- while self._running:
- # Try to receive M-SEARCH requests
- try:
- data, addr = self._socket.recvfrom(4096)
- message = data.decode("utf-8", errors="ignore")
- await self._handle_message(message, addr)
- except BlockingIOError:
- pass
- except Exception as e:
- if self._running:
- logger.debug(f"SSDP receive error: {e}")
- # Send periodic NOTIFY
- now = asyncio.get_event_loop().time()
- if now - last_notify >= notify_interval:
- await self._send_notify()
- last_notify = now
- await asyncio.sleep(0.1)
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.warning(f"SSDP port {SSDP_PORT} in use - real printers may be running")
- else:
- logger.error(f"SSDP server error: {e}")
- except asyncio.CancelledError:
- logger.debug("SSDP server cancelled")
- except Exception as e:
- logger.error(f"SSDP server error: {e}")
- finally:
- await self._cleanup()
- async def stop(self) -> None:
- """Stop the SSDP server."""
- logger.info("Stopping SSDP server")
- self._running = False
- await self._cleanup()
- async def _cleanup(self) -> None:
- """Clean up resources."""
- if self._socket:
- try:
- # Send byebye message
- await self._send_byebye()
- except Exception:
- pass
- try:
- self._socket.close()
- except Exception:
- pass
- self._socket = None
- async def _send_notify(self) -> None:
- """Send SSDP NOTIFY message."""
- if not self._socket:
- return
- try:
- msg = self._build_notify_message()
- self._socket.sendto(msg, (SSDP_ADDR, SSDP_PORT))
- logger.debug(f"Sent SSDP NOTIFY for {self.name}")
- except Exception as e:
- logger.debug(f"Failed to send NOTIFY: {e}")
- async def _send_byebye(self) -> None:
- """Send SSDP byebye message when shutting down."""
- if not self._socket:
- return
- message = (
- "NOTIFY * HTTP/1.1\r\n"
- f"HOST: {SSDP_ADDR}:{SSDP_PORT}\r\n"
- f"NT: {BAMBU_SEARCH_TARGET}\r\n"
- "NTS: ssdp:byebye\r\n"
- f"USN: {self.serial}\r\n"
- "\r\n"
- )
- try:
- self._socket.sendto(message.encode(), (SSDP_ADDR, SSDP_PORT))
- logger.debug("Sent SSDP byebye")
- except Exception:
- pass
- async def _handle_message(self, message: str, addr: tuple[str, int]) -> None:
- """Handle incoming SSDP message.
- Args:
- message: The SSDP message content
- addr: Tuple of (ip_address, port) of sender
- """
- # Check if this is an M-SEARCH request for Bambu printers
- if "M-SEARCH" not in message:
- return
- # Check search target
- if BAMBU_SEARCH_TARGET not in message and "ssdp:all" not in message.lower():
- return
- logger.debug(f"Received M-SEARCH from {addr[0]}")
- # Send response
- if self._socket:
- try:
- response = self._build_response_message()
- self._socket.sendto(response, addr)
- logger.info(f"Sent SSDP response to {addr[0]} for virtual printer '{self.name}'")
- except Exception as e:
- logger.debug(f"Failed to send SSDP response: {e}")
|