""" Bambu Lab printer discovery service using SSDP and subnet scanning. Bambu Lab printers advertise themselves via SSDP (Simple Service Discovery Protocol) on the local network. This service listens for these advertisements and provides a list of discovered printers. For Docker environments where SSDP multicast doesn't work, subnet scanning is available as an alternative discovery method. """ import asyncio import ipaddress import logging import os import re import socket import struct from dataclasses import dataclass from datetime import datetime from pathlib import Path logger = logging.getLogger(__name__) def is_running_in_docker() -> bool: """Detect if we're running inside a Docker container.""" # Check for .dockerenv file if Path("/.dockerenv").exists(): return True # Check cgroup for docker/containerd try: with open("/proc/1/cgroup") as f: content = f.read() if "docker" in content or "containerd" in content or "kubepods" in content: return True except (FileNotFoundError, PermissionError): pass # Check for container environment variable return bool(os.environ.get("CONTAINER") or os.environ.get("DOCKER_CONTAINER")) # SSDP multicast address - Bambu uses port 2021, not standard 1900 SSDP_ADDR = "239.255.255.250" SSDP_PORT = 2021 # Bambu Lab uses non-standard port # Bambu Lab SSDP search target BAMBU_SEARCH_TARGET = "urn:bambulab-com:device:3dprinter:1" # SSDP M-SEARCH message SSDP_MSEARCH = ( "M-SEARCH * HTTP/1.1\r\n" f"HOST: {SSDP_ADDR}:{SSDP_PORT}\r\n" 'MAN: "ssdp:discover"\r\n' "MX: 3\r\n" f"ST: {BAMBU_SEARCH_TARGET}\r\n" "\r\n" ) @dataclass class DiscoveredPrinter: """Represents a discovered Bambu Lab printer.""" serial: str name: str ip_address: str model: str | None = None discovered_at: str | None = None def to_dict(self) -> dict: return { "serial": self.serial, "name": self.name, "ip_address": self.ip_address, "model": self.model, "discovered_at": self.discovered_at, } class PrinterDiscoveryService: """Service for discovering Bambu Lab printers on the network.""" def __init__(self): self._discovered: dict[str, DiscoveredPrinter] = {} self._running = False self._task: asyncio.Task | None = None @property def is_running(self) -> bool: return self._running @property def discovered_printers(self) -> list[DiscoveredPrinter]: return list(self._discovered.values()) def clear(self): """Clear discovered printers.""" self._discovered.clear() async def start(self, duration: float = 10.0): """Start discovery for a specified duration.""" if self._running: return self._running = True self._discovered.clear() self._task = asyncio.create_task(self._discover(duration)) async def stop(self): """Stop discovery.""" self._running = False if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None async def _discover(self, duration: float): """Run discovery for the specified duration. Bambu printers broadcast NOTIFY messages periodically on port 2021. We need to bind to that port and listen for broadcasts. """ sock = None try: # Create UDP socket for SSDP sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Try to set SO_REUSEPORT if available (Linux/macOS) try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except (AttributeError, OSError): pass # Set non-blocking mode sock.setblocking(False) # Bind to the SSDP port to receive NOTIFY broadcasts from printers sock.bind(("", SSDP_PORT)) # Join multicast group to receive multicast messages mreq = struct.pack("4sl", socket.inet_aton(SSDP_ADDR), socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) # Enable broadcast sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) logger.info(f"Starting SSDP discovery on port {SSDP_PORT} for Bambu Lab printers...") # Send initial M-SEARCH request to trigger responses try: sock.sendto(SSDP_MSEARCH.encode(), (SSDP_ADDR, SSDP_PORT)) except Exception as e: logger.debug(f"M-SEARCH send error: {e}") start_time = asyncio.get_event_loop().time() last_send = start_time while self._running and (asyncio.get_event_loop().time() - start_time) < duration: # Try to receive data try: data, addr = sock.recvfrom(4096) message = data.decode("utf-8", errors="ignore") logger.debug(f"Received from {addr[0]}: {message[:100]}...") self._handle_response(message, addr[0]) except BlockingIOError: # No data available, that's fine pass except Exception as e: logger.debug(f"SSDP receive error: {e}") # Re-send M-SEARCH every 3 seconds now = asyncio.get_event_loop().time() if now - last_send >= 3.0: try: sock.sendto(SSDP_MSEARCH.encode(), (SSDP_ADDR, SSDP_PORT)) last_send = now except Exception as e: logger.debug(f"SSDP send error: {e}") await asyncio.sleep(0.1) logger.info(f"Discovery complete. Found {len(self._discovered)} printers.") except OSError as e: if e.errno == 98: # Address already in use logger.warning(f"Port {SSDP_PORT} is in use, trying alternative discovery...") await self._discover_alternative(duration) else: logger.error(f"Discovery error: {e}") except Exception as e: logger.error(f"Discovery error: {e}") finally: self._running = False if sock: try: sock.close() except Exception: pass async def _discover_alternative(self, duration: float): """Alternative discovery using a random port (less reliable).""" sock = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(False) sock.bind(("", 0)) # Join multicast group mreq = struct.pack("4sl", socket.inet_aton(SSDP_ADDR), socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) logger.info("Using alternative discovery method...") start_time = asyncio.get_event_loop().time() last_send = start_time while self._running and (asyncio.get_event_loop().time() - start_time) < duration: try: data, addr = sock.recvfrom(4096) self._handle_response(data.decode("utf-8", errors="ignore"), addr[0]) except BlockingIOError: pass except Exception as e: logger.debug(f"SSDP receive error: {e}") now = asyncio.get_event_loop().time() if now - last_send >= 2.0: try: sock.sendto(SSDP_MSEARCH.encode(), (SSDP_ADDR, SSDP_PORT)) last_send = now except Exception: pass await asyncio.sleep(0.1) logger.info(f"Alternative discovery complete. Found {len(self._discovered)} printers.") except Exception as e: logger.error(f"Alternative discovery error: {e}") finally: if sock: try: sock.close() except Exception: pass def _handle_response(self, response: str, ip_address: str): """Parse SSDP response and extract printer info.""" # Check if it's a Bambu Lab printer response if BAMBU_SEARCH_TARGET not in response and "bambulab" not in response.lower(): logger.debug(f"Ignoring non-Bambu response from {ip_address}") return # Extract USN (Unique Service Name) which contains the serial # Bambu format is just "USN: SERIALNUMBER" (no uuid: prefix) usn_match = re.search(r"USN:\s*(?:uuid:)?([^\s\r\n]+)", response, re.IGNORECASE) if not usn_match: logger.debug(f"No USN found in response from {ip_address}") return serial = usn_match.group(1).strip() # Extract device name from LOCATION or DevName header name = serial # Default to serial if no name found name_match = re.search(r"DevName\.bambu\.com:\s*(.+?)(?:\r\n|\n|$)", response, re.IGNORECASE) if name_match: name = name_match.group(1).strip() # Try to extract model from DevModel header model = None model_match = re.search(r"DevModel\.bambu\.com:\s*(.+?)(?:\r\n|\n|$)", response, re.IGNORECASE) if model_match: model = model_match.group(1).strip() # Also try NT header for model if not model: nt_match = re.search(r"NT:\s*urn:bambulab-com:device:([^:]+)", response, re.IGNORECASE) if nt_match: model = nt_match.group(1).strip() # Skip if already discovered if serial in self._discovered: return printer = DiscoveredPrinter( serial=serial, name=name, ip_address=ip_address, model=model, discovered_at=datetime.now().isoformat(), ) self._discovered[serial] = printer logger.info(f"Discovered printer: {name} ({serial}) at {ip_address}") class SubnetScanner: """Scanner for discovering Bambu printers by probing IP addresses.""" # Bambu printer ports MQTT_PORT = 8883 FTP_PORT = 990 def __init__(self): self._discovered: dict[str, DiscoveredPrinter] = {} self._running = False self._scanned = 0 self._total = 0 @property def is_running(self) -> bool: return self._running @property def discovered_printers(self) -> list[DiscoveredPrinter]: return list(self._discovered.values()) @property def progress(self) -> tuple[int, int]: """Return (scanned, total) counts.""" return self._scanned, self._total async def scan_subnet(self, subnet: str, timeout: float = 1.0) -> list[DiscoveredPrinter]: """Scan a subnet for Bambu printers. Args: subnet: CIDR notation subnet (e.g., "192.168.1.0/24") timeout: Connection timeout per host in seconds Returns: List of discovered printers """ if self._running: return [] self._running = True self._discovered.clear() self._scanned = 0 try: network = ipaddress.ip_network(subnet, strict=False) hosts = list(network.hosts()) self._total = len(hosts) if self._total > 1024: logger.warning(f"Subnet {subnet} has {self._total} hosts, limiting to /22 (1024 hosts)") self._total = 1024 hosts = hosts[:1024] logger.info(f"Starting subnet scan of {subnet} ({self._total} hosts)") # Scan in batches to avoid overwhelming the network batch_size = 50 for i in range(0, len(hosts), batch_size): if not self._running: break batch = hosts[i : i + batch_size] tasks = [self._probe_host(str(ip), timeout) for ip in batch] await asyncio.gather(*tasks, return_exceptions=True) self._scanned = min(i + batch_size, len(hosts)) logger.info(f"Subnet scan complete. Found {len(self._discovered)} printers.") return self.discovered_printers except ValueError as e: logger.error(f"Invalid subnet format: {e}") return [] finally: self._running = False async def _probe_host(self, ip: str, timeout: float): """Probe a single host for Bambu printer ports.""" # Check FTP port (990) - more reliable indicator ftp_open = await self._check_port(ip, self.FTP_PORT, timeout) if not ftp_open: return # Also check MQTT port (8883) for confirmation mqtt_open = await self._check_port(ip, self.MQTT_PORT, timeout) if not mqtt_open: return # Both ports open - likely a Bambu printer logger.info(f"Found potential Bambu printer at {ip}") # Try to get printer info via SSDP unicast serial, name, model = await self._get_printer_info_ssdp(ip, timeout) printer = DiscoveredPrinter( serial=serial or f"unknown-{ip.replace('.', '-')}", name=name or f"Printer at {ip}", ip_address=ip, model=model, discovered_at=datetime.now().isoformat(), ) self._discovered[ip] = printer async def _get_printer_info_ssdp(self, ip: str, timeout: float) -> tuple[str | None, str | None, str | None]: """Try to get printer info via SSDP unicast query.""" loop = asyncio.get_event_loop() def _query(): try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.settimeout(timeout) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Send M-SEARCH directly to the printer msearch = ( "M-SEARCH * HTTP/1.1\r\n" f"HOST: {ip}:{SSDP_PORT}\r\n" 'MAN: "ssdp:discover"\r\n' "MX: 1\r\n" f"ST: {BAMBU_SEARCH_TARGET}\r\n" "\r\n" ) sock.sendto(msearch.encode(), (ip, SSDP_PORT)) # Wait for response data, _ = sock.recvfrom(4096) response = data.decode("utf-8", errors="ignore") sock.close() # Parse response serial = None name = None model = None usn_match = re.search(r"USN:\s*(?:uuid:)?([^\s\r\n]+)", response, re.IGNORECASE) if usn_match: serial = usn_match.group(1).strip() name_match = re.search(r"DevName\.bambu\.com:\s*(.+?)(?:\r\n|\n|$)", response, re.IGNORECASE) if name_match: name = name_match.group(1).strip() model_match = re.search(r"DevModel\.bambu\.com:\s*(.+?)(?:\r\n|\n|$)", response, re.IGNORECASE) if model_match: model = model_match.group(1).strip() logger.debug(f"SSDP info from {ip}: serial={serial}, name={name}, model={model}") return serial, name, model except Exception as e: logger.debug(f"SSDP query to {ip} failed: {e}") return None, None, None return await loop.run_in_executor(None, _query) async def _check_port(self, ip: str, port: int, timeout: float) -> bool: """Check if a port is open on the given IP.""" try: _, writer = await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=timeout) writer.close() await writer.wait_closed() logger.debug(f"Port {port} open on {ip}") return True except TimeoutError: return False except ConnectionRefusedError: return False except OSError as e: # Log first few errors to help debug network issues if self._scanned < 5: logger.debug(f"OSError checking {ip}:{port}: {e}") return False def stop(self): """Stop the current scan.""" self._running = False # Global instances discovery_service = PrinterDiscoveryService() subnet_scanner = SubnetScanner()