| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- """
- Bambu Lab printer discovery service using SSDP.
- Bambu Lab printers advertise themselves via SSDP (Simple Service Discovery Protocol)
- on the local network. This service listens for these advertisements and provides
- a list of discovered printers.
- """
- import asyncio
- import logging
- import re
- import socket
- import struct
- from dataclasses import dataclass
- from datetime import datetime
- logger = logging.getLogger(__name__)
- # SSDP multicast address - Bambu uses port 2021, not standard 1900
- SSDP_ADDR = "239.255.255.250"
- SSDP_PORT = 2021 # Bambu Lab uses non-standard port
- # Bambu Lab SSDP search target
- BAMBU_SEARCH_TARGET = "urn:bambulab-com:device:3dprinter:1"
- # SSDP M-SEARCH message
- SSDP_MSEARCH = (
- "M-SEARCH * HTTP/1.1\r\n"
- f"HOST: {SSDP_ADDR}:{SSDP_PORT}\r\n"
- 'MAN: "ssdp:discover"\r\n'
- "MX: 3\r\n"
- f"ST: {BAMBU_SEARCH_TARGET}\r\n"
- "\r\n"
- )
- @dataclass
- class DiscoveredPrinter:
- """Represents a discovered Bambu Lab printer."""
- serial: str
- name: str
- ip_address: str
- model: str | None = None
- discovered_at: str | None = None
- def to_dict(self) -> dict:
- return {
- "serial": self.serial,
- "name": self.name,
- "ip_address": self.ip_address,
- "model": self.model,
- "discovered_at": self.discovered_at,
- }
- class PrinterDiscoveryService:
- """Service for discovering Bambu Lab printers on the network."""
- def __init__(self):
- self._discovered: dict[str, DiscoveredPrinter] = {}
- self._running = False
- self._task: asyncio.Task | None = None
- @property
- def is_running(self) -> bool:
- return self._running
- @property
- def discovered_printers(self) -> list[DiscoveredPrinter]:
- return list(self._discovered.values())
- def clear(self):
- """Clear discovered printers."""
- self._discovered.clear()
- async def start(self, duration: float = 10.0):
- """Start discovery for a specified duration."""
- if self._running:
- return
- self._running = True
- self._discovered.clear()
- self._task = asyncio.create_task(self._discover(duration))
- async def stop(self):
- """Stop discovery."""
- self._running = False
- if self._task and not self._task.done():
- self._task.cancel()
- try:
- await self._task
- except asyncio.CancelledError:
- pass
- self._task = None
- async def _discover(self, duration: float):
- """Run discovery for the specified duration.
- Bambu printers broadcast NOTIFY messages periodically on port 2021.
- We need to bind to that port and listen for broadcasts.
- """
- sock = None
- try:
- # Create UDP socket for SSDP
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- # Try to set SO_REUSEPORT if available (Linux/macOS)
- try:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- except (AttributeError, OSError):
- pass
- # Set non-blocking mode
- sock.setblocking(False)
- # Bind to the SSDP port to receive NOTIFY broadcasts from printers
- sock.bind(("", SSDP_PORT))
- # Join multicast group to receive multicast messages
- mreq = struct.pack("4sl", socket.inet_aton(SSDP_ADDR), socket.INADDR_ANY)
- sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
- # Enable broadcast
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
- logger.info(f"Starting SSDP discovery on port {SSDP_PORT} for Bambu Lab printers...")
- # Send initial M-SEARCH request to trigger responses
- try:
- sock.sendto(SSDP_MSEARCH.encode(), (SSDP_ADDR, SSDP_PORT))
- except Exception as e:
- logger.debug(f"M-SEARCH send error: {e}")
- start_time = asyncio.get_event_loop().time()
- last_send = start_time
- while self._running and (asyncio.get_event_loop().time() - start_time) < duration:
- # Try to receive data
- try:
- data, addr = sock.recvfrom(4096)
- message = data.decode("utf-8", errors="ignore")
- logger.debug(f"Received from {addr[0]}: {message[:100]}...")
- self._handle_response(message, addr[0])
- except BlockingIOError:
- # No data available, that's fine
- pass
- except Exception as e:
- logger.debug(f"SSDP receive error: {e}")
- # Re-send M-SEARCH every 3 seconds
- now = asyncio.get_event_loop().time()
- if now - last_send >= 3.0:
- try:
- sock.sendto(SSDP_MSEARCH.encode(), (SSDP_ADDR, SSDP_PORT))
- last_send = now
- except Exception as e:
- logger.debug(f"SSDP send error: {e}")
- await asyncio.sleep(0.1)
- logger.info(f"Discovery complete. Found {len(self._discovered)} printers.")
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.warning(f"Port {SSDP_PORT} is in use, trying alternative discovery...")
- await self._discover_alternative(duration)
- else:
- logger.error(f"Discovery error: {e}")
- except Exception as e:
- logger.error(f"Discovery error: {e}")
- finally:
- self._running = False
- if sock:
- try:
- sock.close()
- except Exception:
- pass
- async def _discover_alternative(self, duration: float):
- """Alternative discovery using a random port (less reliable)."""
- sock = None
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.setblocking(False)
- sock.bind(("", 0))
- # Join multicast group
- mreq = struct.pack("4sl", socket.inet_aton(SSDP_ADDR), socket.INADDR_ANY)
- sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
- logger.info("Using alternative discovery method...")
- start_time = asyncio.get_event_loop().time()
- last_send = start_time
- while self._running and (asyncio.get_event_loop().time() - start_time) < duration:
- try:
- data, addr = sock.recvfrom(4096)
- self._handle_response(data.decode("utf-8", errors="ignore"), addr[0])
- except BlockingIOError:
- pass
- except Exception as e:
- logger.debug(f"SSDP receive error: {e}")
- now = asyncio.get_event_loop().time()
- if now - last_send >= 2.0:
- try:
- sock.sendto(SSDP_MSEARCH.encode(), (SSDP_ADDR, SSDP_PORT))
- last_send = now
- except Exception:
- pass
- await asyncio.sleep(0.1)
- logger.info(f"Alternative discovery complete. Found {len(self._discovered)} printers.")
- except Exception as e:
- logger.error(f"Alternative discovery error: {e}")
- finally:
- if sock:
- try:
- sock.close()
- except Exception:
- pass
- def _handle_response(self, response: str, ip_address: str):
- """Parse SSDP response and extract printer info."""
- # Check if it's a Bambu Lab printer response
- if BAMBU_SEARCH_TARGET not in response and "bambulab" not in response.lower():
- logger.debug(f"Ignoring non-Bambu response from {ip_address}")
- return
- # Extract USN (Unique Service Name) which contains the serial
- # Bambu format is just "USN: SERIALNUMBER" (no uuid: prefix)
- usn_match = re.search(r"USN:\s*(?:uuid:)?([^\s\r\n]+)", response, re.IGNORECASE)
- if not usn_match:
- logger.debug(f"No USN found in response from {ip_address}")
- return
- serial = usn_match.group(1).strip()
- # Extract device name from LOCATION or DevName header
- name = serial # Default to serial if no name found
- name_match = re.search(r"DevName\.bambu\.com:\s*(.+?)(?:\r\n|\n|$)", response, re.IGNORECASE)
- if name_match:
- name = name_match.group(1).strip()
- # Try to extract model from DevModel header
- model = None
- model_match = re.search(r"DevModel\.bambu\.com:\s*(.+?)(?:\r\n|\n|$)", response, re.IGNORECASE)
- if model_match:
- model = model_match.group(1).strip()
- # Also try NT header for model
- if not model:
- nt_match = re.search(r"NT:\s*urn:bambulab-com:device:([^:]+)", response, re.IGNORECASE)
- if nt_match:
- model = nt_match.group(1).strip()
- # Skip if already discovered
- if serial in self._discovered:
- return
- printer = DiscoveredPrinter(
- serial=serial,
- name=name,
- ip_address=ip_address,
- model=model,
- discovered_at=datetime.now().isoformat(),
- )
- self._discovered[serial] = printer
- logger.info(f"Discovered printer: {name} ({serial}) at {ip_address}")
- # Global discovery service instance
- discovery_service = PrinterDiscoveryService()
|