| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081 |
- """Virtual Printer Manager - coordinates SSDP, MQTT, and FTP services.
- Each virtual printer runs its own independent services (FTP, MQTT, SSDP, Bind)
- bound to its dedicated IP address, regardless of mode.
- """
- import asyncio
- import logging
- from collections.abc import Callable
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import TYPE_CHECKING
- from backend.app.core.config import settings as app_settings
- from backend.app.services.virtual_printer.bind_server import BindServer
- from backend.app.services.virtual_printer.certificate import CertificateService
- from backend.app.services.virtual_printer.ftp_server import VirtualPrinterFTPServer
- from backend.app.services.virtual_printer.mqtt_bridge import MQTTBridge
- from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
- from backend.app.services.virtual_printer.ssdp_server import SSDPProxy, VirtualPrinterSSDPServer
- from backend.app.services.virtual_printer.tcp_proxy import SlicerProxyManager, TCPProxy
- if TYPE_CHECKING:
- from backend.app.services.printer_manager import PrinterManager
- logger = logging.getLogger(__name__)
- # Mapping of SSDP model codes to display names
- # These are the codes that slicers expect during discovery
- # Sources:
- # - https://gist.github.com/Alex-Schaefer/72a9e2491a42da2ef99fb87601955cc3
- # - https://github.com/psychoticbeef/BambuLabOrcaSlicerDiscovery
- VIRTUAL_PRINTER_MODELS = {
- # X1 Series
- "BL-P001": "X1C", # X1 Carbon
- "BL-P002": "X1", # X1
- "C13": "X1E", # X1E
- # X2 Series
- "N6": "X2D", # X2D
- # P Series
- "C11": "P1P", # P1P
- "C12": "P1S", # P1S
- "N7": "P2S", # P2S
- # A1 Series
- "N2S": "A1", # A1
- "N1": "A1 Mini", # A1 Mini
- # H2 Series
- "O1D": "H2D", # H2D
- "O1C": "H2C", # H2C
- "O1C2": "H2C", # H2C (dual nozzle variant)
- "O1S": "H2S", # H2S
- }
- # Serial number prefixes for each model (based on Bambu Lab serial number format)
- # Format: MMM??RYMDDUUUUU (15 chars total)
- # MMM = Model prefix (3 chars)
- # ?? = Unknown/revision code (2 chars)
- # R = Revision letter (1 char)
- # Y = Year digit (1 char)
- # M = Month (1 char, hex: 1-9, A=Oct, B=Nov, C=Dec)
- # DD = Day (2 chars)
- # UUUUU = Unit number (5 chars)
- MODEL_SERIAL_PREFIXES = {
- # X1 Series
- "BL-P001": "00M00A", # X1C
- "BL-P002": "00M00A", # X1
- "C13": "03W00A", # X1E
- # X2 Series
- "N6": "20P90A", # X2D (first 4 chars "20P9" match real serials)
- # P Series
- "C11": "01S00A", # P1P
- "C12": "01P00A", # P1S
- "N7": "22E00A", # P2S
- # A1 Series
- "N2S": "03900A", # A1
- "N1": "03000A", # A1 Mini
- # H2 Series
- "O1D": "09400A", # H2D
- "O1C": "09400A", # H2C
- "O1C2": "09400A", # H2C (dual nozzle variant)
- "O1S": "09400A", # H2S
- }
- # Reverse mapping: display name → SSDP model code (for auto-inheriting from printer model)
- DISPLAY_NAME_TO_MODEL_CODE = {v: k for k, v in VIRTUAL_PRINTER_MODELS.items()}
- # Default model
- DEFAULT_VIRTUAL_PRINTER_MODEL = "BL-P001" # X1C
- def _get_serial_for_model(model: str, serial_suffix: str) -> str:
- """Get serial number for the given model and suffix."""
- prefix = MODEL_SERIAL_PREFIXES.get(model, "00M09A")
- return f"{prefix}{serial_suffix}"
- class VirtualPrinterInstance:
- """Per-printer state and file handling logic.
- Each instance represents one virtual printer with its own config,
- upload directory, certificates, and file handling mode.
- """
- def __init__(
- self,
- *,
- vp_id: int,
- name: str,
- mode: str,
- model: str,
- access_code: str,
- serial_suffix: str,
- target_printer_ip: str = "",
- target_printer_serial: str = "",
- target_printer_id: int | None = None,
- auto_dispatch: bool = True,
- queue_force_color_match: bool = False,
- bind_ip: str = "",
- remote_interface_ip: str = "",
- tailscale_disabled: bool = True,
- base_dir: Path,
- session_factory: Callable | None = None,
- printer_manager: "PrinterManager | None" = None,
- ):
- self.id = vp_id
- self.name = name
- self.mode = mode
- self.model = model
- self.access_code = access_code
- self.serial_suffix = serial_suffix
- self.target_printer_ip = target_printer_ip
- self.target_printer_serial = target_printer_serial
- self.target_printer_id = target_printer_id
- self.auto_dispatch = auto_dispatch
- self.queue_force_color_match = queue_force_color_match
- self.bind_ip = bind_ip
- self.remote_interface_ip = remote_interface_ip
- self.tailscale_disabled = tailscale_disabled
- self._session_factory = session_factory
- self._printer_manager = printer_manager
- # Directories
- self.upload_dir = base_dir / "uploads" / str(vp_id)
- self.cert_dir = base_dir / "certs" / str(vp_id)
- shared_ca_dir = base_dir / "certs"
- # Ensure directories exist
- self.upload_dir.mkdir(parents=True, exist_ok=True)
- (self.upload_dir / "cache").mkdir(exist_ok=True)
- self.cert_dir.mkdir(parents=True, exist_ok=True)
- # Certificate service (shared CA, per-instance printer cert)
- self._cert_service = CertificateService(
- cert_dir=self.cert_dir,
- serial=self.serial,
- shared_ca_dir=shared_ca_dir,
- )
- # Pending files for MQTT correlation
- self._pending_files: dict[str, Path] = {}
- # Slicer-side print options captured from the MQTT `project_file`
- # command, keyed by filename. Used by `_add_to_print_queue` so the
- # queue item inherits the user's slicer-chosen timelapse / bed_leveling
- # / flow_cali / vibration_cali / layer_inspect / use_ams toggles rather
- # than falling back to the global `default_*` settings (#1403). FTP
- # completes a few hundred ms before the slicer's MQTT `project_file`
- # arrives, so the queue-add path waits briefly on the event below
- # before reading the dict. Events are popped along with the options
- # so the dict stays bounded.
- self._slicer_print_options: dict[str, dict] = {}
- self._slicer_print_options_events: dict[str, asyncio.Event] = {}
- # Per-instance services
- self._proxy: SlicerProxyManager | None = None
- self._ftp: VirtualPrinterFTPServer | None = None
- self._mqtt: SimpleMQTTServer | None = None
- self._mqtt_bridge: MQTTBridge | None = None
- self._rtsp_proxy: TCPProxy | None = None
- self._bind: BindServer | None = None
- self._ssdp: VirtualPrinterSSDPServer | None = None
- self._ssdp_proxy: SSDPProxy | None = None
- self._tasks: list[asyncio.Task] = []
- @property
- def serial(self) -> str:
- """Full serial number for this virtual printer."""
- return _get_serial_for_model(self.model or DEFAULT_VIRTUAL_PRINTER_MODEL, self.serial_suffix)
- @property
- def cert_path(self) -> Path:
- return self._cert_service.cert_path
- @property
- def key_path(self) -> Path:
- return self._cert_service.key_path
- @property
- def is_proxy(self) -> bool:
- return self.mode == "proxy"
- @property
- def is_running(self) -> bool:
- return len(self._tasks) > 0 and all(not t.done() for t in self._tasks)
- def generate_certificates(self) -> tuple[Path, Path]:
- """Generate certificates for this instance."""
- self._cert_service.serial = self.serial if not self.is_proxy else (self.target_printer_serial or self.serial)
- additional_ips = [self.remote_interface_ip] if self.remote_interface_ip else None
- if self.bind_ip:
- additional_ips = additional_ips or []
- additional_ips.append(self.bind_ip)
- self._cert_service.delete_printer_certificate()
- return self._cert_service.generate_certificates(additional_ips=additional_ips)
- # -- File handling callbacks --
- async def on_file_received(self, file_path: Path, source_ip: str) -> None:
- """Handle file upload completion from FTP."""
- logger.info("[VP %s] Received file: %s from %s", self.name, file_path.name, source_ip)
- self._pending_files[file_path.name] = file_path
- if self.mode == "immediate":
- await self._archive_file(file_path, source_ip)
- elif self.mode == "print_queue":
- await self._add_to_print_queue(file_path, source_ip)
- else:
- await self._queue_file(file_path, source_ip)
- # Signal job completion to the slicer. Send-flow slicers don't watch the
- # post-upload state and would be happy with anything; the Print flow
- # (intended for proxy-mode VPs, but users sometimes click it against
- # queue/immediate/review modes too — #1280) watches the gcode_state
- # cycle and only releases its in-flight-job lock when it sees FINISH.
- # Going PREPARE → IDLE wedges the slicer's UI at "Downloading...(0%)"
- # and blocks the next dispatch with "busy with another print job".
- # PREPARE → FINISH satisfies both flows. prepare_percent=100 also
- # unfreezes the slicer's "Downloading X%" progress bar which it ticks
- # against the same field during the upload window.
- if self._mqtt and file_path.suffix.lower() == ".3mf":
- self._mqtt.set_gcode_state("FINISH", filename=file_path.name, prepare_percent="100")
- async def on_print_command(self, filename: str, data: dict) -> None:
- """Handle print command from MQTT.
- Captures the slicer's project_file options (`timelapse`, `bed_leveling`,
- `flow_cali`, `vibration_cali`, `layer_inspect`, `use_ams`) so the
- VP-queue path can inherit them when adding the item to the queue,
- rather than falling back to the global default settings (#1403).
- Only queue mode consumes the capture; immediate / review / proxy
- modes ignore the print command, so we skip the stash there to keep
- the dict from accumulating one entry per print over the VP's
- uptime.
- """
- logger.info("[VP %s] Print command for: %s", self.name, filename)
- if self.mode != "print_queue":
- return
- self._slicer_print_options[filename] = dict(data)
- event = self._slicer_print_options_events.get(filename)
- if event:
- event.set()
- async def _archive_file(self, file_path: Path, source_ip: str) -> None:
- """Archive file immediately."""
- if not self._session_factory:
- logger.error("Cannot archive: no database session factory configured")
- return
- if file_path.suffix.lower() != ".3mf":
- logger.debug("Skipping non-3MF file: %s", file_path.name)
- self._pending_files.pop(file_path.name, None)
- try:
- file_path.unlink()
- except OSError:
- pass
- return
- try:
- from backend.app.api.routes.settings import get_setting
- from backend.app.services.archive import ArchiveService
- async with self._session_factory() as db:
- name_source = await get_setting(db, "virtual_printer_archive_name_source")
- prefer_filename = name_source == "filename"
- service = ArchiveService(db)
- archive = await service.archive_print(
- printer_id=None,
- source_file=file_path,
- print_data={
- "status": "archived",
- "source": "virtual_printer",
- "source_ip": source_ip,
- },
- prefer_filename_for_name=prefer_filename,
- )
- if archive:
- logger.info("[VP %s] Archived: %s - %s", self.name, archive.id, archive.print_name)
- await self._broadcast_archive_created(archive)
- try:
- file_path.unlink()
- except OSError:
- pass
- self._pending_files.pop(file_path.name, None)
- else:
- logger.error("Failed to archive file: %s", file_path.name)
- except Exception as e:
- logger.error("Error archiving file: %s", e)
- async def _queue_file(self, file_path: Path, source_ip: str) -> None:
- """Queue file for user review."""
- if not self._session_factory:
- logger.error("Cannot queue: no database session factory configured")
- return
- if file_path.suffix.lower() != ".3mf":
- self._pending_files.pop(file_path.name, None)
- try:
- file_path.unlink()
- except OSError:
- pass
- return
- # Peek at the 3MF for the embedded title BEFORE we hand it off to the
- # DB. Storing it now means the /pending-uploads/ list doesn't have to
- # reopen every 3MF on every render to keep the review card and the
- # eventual archive name in sync (#1152 follow-up). Failure to parse is
- # not fatal — the response model falls back to the filename stem.
- metadata_print_name: str | None = None
- try:
- from backend.app.services.archive import ThreeMFParser
- parsed = ThreeMFParser(file_path).parse()
- raw_name = parsed.get("print_name")
- if isinstance(raw_name, str) and raw_name.strip():
- metadata_print_name = raw_name.strip()[:255]
- except Exception as e:
- logger.debug("[VP %s] Metadata title peek failed for %s: %s", self.name, file_path.name, e)
- try:
- from backend.app.models.pending_upload import PendingUpload
- async with self._session_factory() as db:
- pending = PendingUpload(
- filename=file_path.name,
- file_path=str(file_path),
- file_size=file_path.stat().st_size,
- source_ip=source_ip,
- status="pending",
- uploaded_at=datetime.now(timezone.utc),
- metadata_print_name=metadata_print_name,
- )
- db.add(pending)
- await db.commit()
- logger.info("[VP %s] Queued: %s - %s", self.name, pending.id, file_path.name)
- self._pending_files.pop(file_path.name, None)
- except Exception as e:
- logger.error("Error queueing file: %s", e)
- async def _add_to_print_queue(self, file_path: Path, source_ip: str) -> None:
- """Archive file and add to print queue, assigned to target printer or model."""
- if not self._session_factory:
- logger.error("Cannot add to print queue: no database session factory configured")
- return
- if file_path.suffix.lower() != ".3mf":
- self._pending_files.pop(file_path.name, None)
- try:
- file_path.unlink()
- except OSError:
- pass
- return
- # Wait briefly for the slicer's MQTT `project_file` command so the
- # queue item can inherit the slicer-side print options the user
- # picked (timelapse, bed_leveling, etc). Slicers send the FTP upload
- # first and the MQTT command immediately after, so the typical lag
- # is a few hundred ms; 2 s is conservative without making every
- # VP-queue add visibly slow. Falls back to the global default_*
- # settings if MQTT doesn't arrive in time (legacy behaviour for
- # users on a slicer that doesn't send a print command). #1403.
- # The wait is skipped when there's no MQTT server attached — covers
- # unit tests that invoke `_add_to_print_queue` directly without
- # going through `on_print_command`, so they don't pay the 2 s tax.
- slicer_opts = self._slicer_print_options.pop(file_path.name, None)
- if slicer_opts is None and self._mqtt is not None:
- event = asyncio.Event()
- self._slicer_print_options_events[file_path.name] = event
- try:
- await asyncio.wait_for(event.wait(), timeout=2.0)
- slicer_opts = self._slicer_print_options.pop(file_path.name, None)
- except asyncio.TimeoutError:
- slicer_opts = None
- finally:
- self._slicer_print_options_events.pop(file_path.name, None)
- try:
- import json
- from backend.app.api.routes.settings import get_setting
- from backend.app.models.print_queue import PrintQueueItem
- from backend.app.services.archive import ArchiveService
- from backend.app.services.filament_requirements import extract_filament_requirements
- async with self._session_factory() as db:
- name_source = await get_setting(db, "virtual_printer_archive_name_source")
- prefer_filename = name_source == "filename"
- # Read workflow defaults from settings. Without this the
- # PrintQueueItem below would fall back to the column-level
- # defaults and ignore the user's workflow preferences (#1235).
- # Fallbacks match AppSettings defaults in schemas/settings.py.
- # The slicer-side options captured above (if any) take
- # precedence per-field over these defaults.
- def _bool_setting(value: str | None, default: bool) -> bool:
- return value.lower() == "true" if value is not None else default
- def _slicer_or(field_mqtt: str, settings_default: bool) -> bool:
- """Slicer's MQTT value if present, else the settings default.
- Slicer payloads carry both bool and int (0/1) shapes
- depending on firmware family — coerce via bool() so
- `0`/`False` and `1`/`True` both work.
- """
- if slicer_opts is not None and field_mqtt in slicer_opts:
- return bool(slicer_opts[field_mqtt])
- return settings_default
- # Note the MQTT field names differ from Bambuddy's column
- # names: MQTT uses `bed_leveling` (single L) while the
- # column / settings key use `bed_levelling` (double L).
- bed_levelling = _slicer_or(
- "bed_leveling", _bool_setting(await get_setting(db, "default_bed_levelling"), True)
- )
- flow_cali = _slicer_or("flow_cali", _bool_setting(await get_setting(db, "default_flow_cali"), False))
- vibration_cali = _slicer_or(
- "vibration_cali", _bool_setting(await get_setting(db, "default_vibration_cali"), True)
- )
- layer_inspect = _slicer_or(
- "layer_inspect", _bool_setting(await get_setting(db, "default_layer_inspect"), False)
- )
- timelapse = _slicer_or("timelapse", _bool_setting(await get_setting(db, "default_timelapse"), False))
- service = ArchiveService(db)
- archive = await service.archive_print(
- printer_id=None,
- source_file=file_path,
- print_data={
- "status": "archived",
- "source": "virtual_printer",
- "source_ip": source_ip,
- },
- prefer_filename_for_name=prefer_filename,
- )
- if archive:
- logger.info("[VP %s] Archived: %s - %s", self.name, archive.id, archive.print_name)
- # Assign to specific printer if configured, otherwise use model for "Any X" scheduling
- target_model = None
- if not self.target_printer_id and self.model:
- target_model = VIRTUAL_PRINTER_MODELS.get(self.model)
- plate_id = self._extract_plate_id(file_path)
- # Parse the 3MF for per-slot filament requirements (#1188).
- # The manual /print-queue/ POST flow does this at queue-add
- # time; the VP path used to skip it, so the scheduler fell
- # through to model-only matching and dispatched onto whatever
- # printer happened to be free regardless of loaded colour.
- # required_filament_types is populated unconditionally — it's
- # cheap, lets the scheduler reject obvious mis-matches even
- # without force_color_match. filament_overrides only carries
- # force_color_match=True when the per-VP setting is on, so
- # upgraders keep the old behaviour by default.
- required_filament_types_json: str | None = None
- filament_overrides_json: str | None = None
- requirements = extract_filament_requirements(file_path, plate_id)
- if requirements:
- types = sorted({r["type"] for r in requirements if r.get("type")})
- if types:
- required_filament_types_json = json.dumps(types)
- if self.queue_force_color_match:
- overrides = [
- {
- "slot_id": r["slot_id"],
- "type": r.get("type", ""),
- "color": r.get("color", ""),
- "force_color_match": True,
- }
- for r in requirements
- if r.get("type") and r.get("color")
- ]
- if overrides:
- filament_overrides_json = json.dumps(overrides)
- queue_item = PrintQueueItem(
- printer_id=self.target_printer_id,
- target_model=target_model,
- archive_id=archive.id,
- plate_id=plate_id,
- position=1,
- status="pending",
- manual_start=not self.auto_dispatch,
- required_filament_types=required_filament_types_json,
- filament_overrides=filament_overrides_json,
- bed_levelling=bed_levelling,
- flow_cali=flow_cali,
- vibration_cali=vibration_cali,
- layer_inspect=layer_inspect,
- timelapse=timelapse,
- )
- db.add(queue_item)
- await db.commit()
- logger.info("[VP %s] Added to queue: %s", self.name, queue_item.id)
- await self._broadcast_archive_created(archive)
- try:
- file_path.unlink()
- except OSError:
- pass
- self._pending_files.pop(file_path.name, None)
- else:
- logger.error("Failed to archive file: %s", file_path.name)
- except Exception as e:
- logger.error("Error adding to print queue: %s", e)
- async def _broadcast_archive_created(self, archive) -> None:
- """Notify connected clients that a new archive exists.
- Real-printer prints get this from main.py's MQTT print_start handler;
- VP-uploaded prints need their own broadcast or the Archives page stays
- stale until the user switches tabs (#1282).
- """
- try:
- from backend.app.core.websocket import ws_manager
- await ws_manager.send_archive_created(
- {
- "id": archive.id,
- "printer_id": archive.printer_id,
- "filename": archive.filename,
- "print_name": archive.print_name,
- "status": archive.status,
- }
- )
- except Exception as e:
- logger.debug("[VP %s] archive_created broadcast failed: %s", self.name, e)
- @staticmethod
- def _extract_plate_id(file_path: Path) -> int | None:
- """Extract plate index from 3MF slice_info.config."""
- try:
- import xml.etree.ElementTree as ET
- import zipfile
- with zipfile.ZipFile(file_path, "r") as zf:
- if "Metadata/slice_info.config" in zf.namelist():
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content) # noqa: S314 # nosec B314
- plate = root.find(".//plate")
- if plate is not None:
- for meta in plate.findall("metadata"):
- if meta.get("key") == "index" and meta.get("value"):
- return int(meta.get("value"))
- except Exception:
- return None
- return None
- # -- Service lifecycle --
- def _resolve_cert_and_advertise(self) -> tuple[Path, Path, str]:
- """Return (cert_path, key_path, advertise_address) for TLS services.
- Always uses the self-signed cert chain (signed by `bbl_ca`). The user
- imports `bbl_ca.crt` once into the slicer; per-VP certs validate from
- there. Tailscale exposure is handled by the user picking the Tailscale
- IP in the bind_ip dropdown.
- """
- cert_path, key_path = self.generate_certificates()
- advertise = self.remote_interface_ip or self.bind_ip or ""
- return cert_path, key_path, advertise
- async def start_server(self) -> None:
- """Start server-mode services (FTP, MQTT, SSDP, Bind) on this VP's bind_ip."""
- logger.info("[VP %s] Starting server-mode services on %s", self.name, self.bind_ip)
- cert_path, key_path, advertise_addr = self._resolve_cert_and_advertise()
- bind_addr = self.bind_ip or "0.0.0.0" # nosec B104
- async def run_with_logging(coro, svc_name):
- try:
- await coro
- except Exception as e:
- logger.error("[VP %s] %s failed: %s", self.name, svc_name, e)
- self._tasks = []
- # FTP server
- self._ftp = VirtualPrinterFTPServer(
- upload_dir=self.upload_dir,
- access_code=self.access_code,
- cert_path=cert_path,
- key_path=key_path,
- on_file_received=self.on_file_received,
- bind_address=bind_addr,
- vp_name=self.name,
- )
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._ftp.start(), "FTP"),
- name=f"vp_{self.id}_ftp",
- )
- )
- # MQTT server
- self._mqtt = SimpleMQTTServer(
- serial=self.serial,
- access_code=self.access_code,
- cert_path=cert_path,
- key_path=key_path,
- on_print_command=self.on_print_command,
- model=self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- bind_address=bind_addr,
- vp_name=self.name,
- )
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._mqtt.start(), "MQTT"),
- name=f"vp_{self.id}_mqtt",
- )
- )
- # MQTT bridge — fans out the target printer's pushes to slicers connected
- # to this VP and forwards their commands back to the printer. Only meaningful
- # when a target printer is configured AND printer_manager was injected (it
- # always is at runtime; tests may omit it).
- if self.target_printer_id is not None and self._printer_manager is not None:
- self._mqtt_bridge = MQTTBridge(
- vp_id=self.id,
- vp_name=self.name,
- vp_serial=self.serial,
- target_printer_id=self.target_printer_id,
- mqtt_server=self._mqtt,
- printer_manager=self._printer_manager,
- )
- self._mqtt.set_bridge(self._mqtt_bridge)
- await self._mqtt_bridge.start()
- # RTSPS camera passthrough on port 322. BambuStudio's camera button
- # connects to the device IP it bound on (the VP), not the IP in
- # `ipcam.rtsp_url`. Without a listener on <bind_ip>:322 the slicer
- # gets connection refused → "LAN connection failed". Same raw TCP
- # pass-through used by SlicerProxyManager in proxy mode.
- target_client = self._printer_manager.get_client(self.target_printer_id)
- target_ip = getattr(target_client, "ip_address", None) if target_client else None
- if target_ip:
- self._rtsp_proxy = TCPProxy(
- name="RTSP",
- listen_port=322,
- target_host=target_ip,
- target_port=322,
- bind_address=bind_addr,
- )
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._rtsp_proxy.start(), "RTSP"),
- name=f"vp_{self.id}_rtsp",
- )
- )
- # Bind server
- self._bind = BindServer(
- serial=self.serial,
- model=self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- name=self.name,
- bind_address=bind_addr,
- cert_path=cert_path,
- key_path=key_path,
- )
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._bind.start(), "Bind"),
- name=f"vp_{self.id}_bind",
- )
- )
- # SSDP server — advertise_addr is the Tailscale FQDN when available,
- # otherwise the bind/remote IP (existing behaviour)
- self._ssdp = VirtualPrinterSSDPServer(
- name=self.name,
- serial=self.serial,
- model=self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- advertise_ip=advertise_addr,
- bind_ip=bind_addr,
- )
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._ssdp.start(), "SSDP"),
- name=f"vp_{self.id}_ssdp",
- )
- )
- logger.info("[VP %s] Server-mode services started on %s", self.name, bind_addr)
- async def stop_server(self) -> None:
- """Stop server-mode services."""
- if self._mqtt_bridge:
- try:
- await self._mqtt_bridge.stop()
- except Exception:
- logger.exception("[VP %s] MQTT bridge stop failed", self.name)
- if self._mqtt:
- self._mqtt.set_bridge(None)
- self._mqtt_bridge = None
- if self._rtsp_proxy:
- try:
- await self._rtsp_proxy.stop()
- except Exception:
- logger.exception("[VP %s] RTSP proxy stop failed", self.name)
- self._rtsp_proxy = None
- if self._ftp:
- await self._ftp.stop()
- self._ftp = None
- if self._mqtt:
- await self._mqtt.stop()
- self._mqtt = None
- if self._bind:
- await self._bind.stop()
- self._bind = None
- if self._ssdp:
- await self._ssdp.stop()
- self._ssdp = None
- await self._cancel_tasks()
- async def start_proxy(self) -> None:
- """Start proxy mode services for this instance."""
- logger.info("[VP %s] Starting proxy mode to %s", self.name, self.target_printer_ip)
- cert_path, key_path, _ = self._resolve_cert_and_advertise()
- self._proxy = SlicerProxyManager(
- target_host=self.target_printer_ip,
- cert_path=cert_path,
- key_path=key_path,
- on_activity=lambda n, m: logger.info("[VP %s] Proxy %s: %s", self.name, n, m),
- bind_address=self.bind_ip or "0.0.0.0", # nosec B104
- bind_identity={
- "serial": self.target_printer_serial or self.serial,
- "model": self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- "name": self.name,
- "version": "01.00.00.00",
- },
- )
- async def run_with_logging(coro, svc_name):
- try:
- await coro
- except Exception as e:
- logger.error("[VP %s] %s failed: %s", self.name, svc_name, e)
- self._tasks = []
- # SSDP for proxy
- proxy_serial = self.target_printer_serial or self.serial
- if self.remote_interface_ip:
- from backend.app.services.network_utils import find_interface_for_ip
- local_iface = find_interface_for_ip(self.target_printer_ip)
- if local_iface:
- self._ssdp_proxy = SSDPProxy(
- local_interface_ip=local_iface["ip"],
- remote_interface_ip=self.remote_interface_ip,
- target_printer_ip=self.target_printer_ip,
- name=self.name,
- )
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._ssdp_proxy.start(), "SSDP Proxy"),
- name=f"vp_{self.id}_ssdp_proxy",
- )
- )
- else:
- self._start_fallback_ssdp(proxy_serial, run_with_logging)
- else:
- self._start_fallback_ssdp(proxy_serial, run_with_logging)
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._proxy.start(), "Proxy"),
- name=f"vp_{self.id}_proxy",
- )
- )
- def _start_fallback_ssdp(self, proxy_serial: str, run_with_logging) -> None:
- """Start single-interface SSDP server as fallback for proxy mode."""
- self._ssdp = VirtualPrinterSSDPServer(
- name=f"{self.name} (Proxy)",
- serial=proxy_serial,
- model=self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- advertise_ip=self.bind_ip or "",
- bind_ip=self.bind_ip or "",
- )
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._ssdp.start(), "SSDP"),
- name=f"vp_{self.id}_ssdp",
- )
- )
- async def stop_proxy(self) -> None:
- """Stop proxy mode services for this instance."""
- if self._proxy:
- await self._proxy.stop()
- self._proxy = None
- if self._ssdp:
- await self._ssdp.stop()
- self._ssdp = None
- if self._ssdp_proxy:
- await self._ssdp_proxy.stop()
- self._ssdp_proxy = None
- await self._cancel_tasks()
- async def _cancel_tasks(self) -> None:
- """Cancel all running tasks and wait for cleanup."""
- for task in self._tasks:
- task.cancel()
- if self._tasks:
- try:
- await asyncio.wait_for(asyncio.gather(*self._tasks, return_exceptions=True), timeout=1.0)
- except TimeoutError:
- pass
- self._tasks = []
- def get_status(self) -> dict:
- """Get status for this instance."""
- status: dict = {
- "running": self.is_running,
- "pending_files": len(self._pending_files),
- }
- if self.is_proxy and self._proxy:
- status["proxy"] = self._proxy.get_status()
- return status
- class VirtualPrinterManager:
- """Multi-instance virtual printer registry and orchestrator.
- Every VP runs its own independent services on a dedicated bind IP.
- """
- def __init__(self):
- self._session_factory: Callable | None = None
- self._printer_manager: PrinterManager | None = None
- self._instances: dict[int, VirtualPrinterInstance] = {}
- # Directories
- self._base_dir = app_settings.base_dir / "virtual_printer"
- # Ensure base directories exist
- self._ensure_base_directories()
- def _ensure_base_directories(self) -> None:
- """Create base directories at startup."""
- for dir_path in [self._base_dir, self._base_dir / "uploads", self._base_dir / "certs"]:
- try:
- dir_path.mkdir(parents=True, exist_ok=True)
- except PermissionError:
- logger.error(
- f"Cannot create directory {dir_path}: Permission denied. "
- f"For Docker: ensure the data volume is writable by the container user. "
- f"For bare metal: run 'sudo chown -R $(whoami) {self._base_dir}'"
- )
- def set_session_factory(self, session_factory: Callable) -> None:
- """Set the database session factory."""
- self._session_factory = session_factory
- def set_printer_manager(self, printer_manager: "PrinterManager") -> None:
- """Inject the global printer_manager so non-proxy VPs can mirror their target's MQTT stream."""
- self._printer_manager = printer_manager
- @property
- def is_enabled(self) -> bool:
- """Check if any virtual printer is running."""
- return len(self._instances) > 0
- async def sync_from_db(self) -> None:
- """Load all VPs from DB, reconcile running state."""
- if not self._session_factory:
- logger.warning("Cannot sync virtual printers: no session factory")
- return
- from sqlalchemy import select
- from backend.app.models.printer import Printer
- from backend.app.models.virtual_printer import VirtualPrinter
- async with self._session_factory() as db:
- result = await db.execute(
- select(VirtualPrinter).where(VirtualPrinter.enabled == True).order_by(VirtualPrinter.position) # noqa: E712
- )
- enabled_vps = result.scalars().all()
- # Stop instances that are no longer enabled or changed mode
- enabled_ids = {vp.id for vp in enabled_vps}
- for vp_id in list(self._instances.keys()):
- if vp_id not in enabled_ids:
- await self.remove_instance(vp_id)
- # Look up printer IPs for proxy VPs
- proxy_vps = [vp for vp in enabled_vps if vp.mode == "proxy"]
- proxy_ips: dict[int, tuple[str, str]] = {}
- if proxy_vps:
- async with self._session_factory() as db:
- for pvp in proxy_vps:
- if pvp.target_printer_id:
- result = await db.execute(select(Printer).where(Printer.id == pvp.target_printer_id))
- printer = result.scalar_one_or_none()
- if printer:
- proxy_ips[pvp.id] = (printer.ip_address, printer.serial_number)
- # Detect config changes on running instances and restart if needed
- for vp in enabled_vps:
- instance = self._instances.get(vp.id)
- if not instance:
- continue
- changed = (
- instance.mode != vp.mode
- or instance.model != (vp.model or DEFAULT_VIRTUAL_PRINTER_MODEL)
- or instance.access_code != (vp.access_code or "")
- or instance.bind_ip != (vp.bind_ip or "")
- or instance.remote_interface_ip != (vp.remote_interface_ip or "")
- or instance.target_printer_id != vp.target_printer_id
- or instance.auto_dispatch != vp.auto_dispatch
- )
- if changed:
- logger.info(
- "VP %s config changed (mode: %s→%s), restarting",
- instance.name,
- instance.mode,
- vp.mode,
- )
- await self.remove_instance(vp.id)
- # Start instances for all enabled VPs (skip already running)
- for vp in enabled_vps:
- if vp.id in self._instances:
- continue
- if vp.mode == "proxy":
- ip_info = proxy_ips.get(vp.id)
- if not ip_info:
- logger.warning("Proxy VP %s: target printer not found, skipping", vp.name)
- continue
- target_ip, target_serial = ip_info
- instance = VirtualPrinterInstance(
- vp_id=vp.id,
- name=vp.name,
- mode=vp.mode,
- model=vp.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- access_code=vp.access_code or "",
- serial_suffix=vp.serial_suffix,
- target_printer_ip=target_ip,
- target_printer_serial=target_serial,
- auto_dispatch=vp.auto_dispatch,
- bind_ip=vp.bind_ip or "",
- remote_interface_ip=vp.remote_interface_ip or "",
- tailscale_disabled=vp.tailscale_disabled,
- base_dir=self._base_dir,
- session_factory=self._session_factory,
- )
- self._instances[vp.id] = instance
- await instance.start_proxy()
- logger.info("Started proxy VP: %s → %s (bind=%s)", instance.name, target_ip, instance.bind_ip)
- else:
- instance = VirtualPrinterInstance(
- vp_id=vp.id,
- name=vp.name,
- mode=vp.mode,
- model=vp.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- access_code=vp.access_code or "",
- serial_suffix=vp.serial_suffix,
- target_printer_id=vp.target_printer_id,
- auto_dispatch=vp.auto_dispatch,
- queue_force_color_match=vp.queue_force_color_match,
- bind_ip=vp.bind_ip or "",
- remote_interface_ip=vp.remote_interface_ip or "",
- tailscale_disabled=vp.tailscale_disabled,
- base_dir=self._base_dir,
- session_factory=self._session_factory,
- printer_manager=self._printer_manager,
- )
- self._instances[vp.id] = instance
- await instance.start_server()
- logger.info("Started server-mode VP: %s on %s", instance.name, vp.bind_ip)
- async def remove_instance(self, vp_id: int) -> None:
- """Stop and remove a single VP instance."""
- instance = self._instances.pop(vp_id, None)
- if instance:
- if instance.is_proxy:
- await instance.stop_proxy()
- else:
- await instance.stop_server()
- logger.info("Removed VP instance: %s", instance.name)
- async def stop_all(self) -> None:
- """Shutdown all virtual printer services."""
- logger.info("Stopping all virtual printer services...")
- for vp_id in list(self._instances.keys()):
- await self.remove_instance(vp_id)
- logger.info("All virtual printer services stopped")
- def get_instance(self, vp_id: int) -> VirtualPrinterInstance | None:
- """Get a running instance by ID."""
- return self._instances.get(vp_id)
- def get_all_status(self) -> list[dict]:
- """Get status for all running instances."""
- return [
- {
- "id": inst.id,
- "name": inst.name,
- "mode": inst.mode,
- **inst.get_status(),
- }
- for inst in self._instances.values()
- ]
- # -- Legacy single-printer compat --
- def get_status(self) -> dict:
- """Get status for first virtual printer (backward compat)."""
- if self._instances:
- first = next(iter(self._instances.values()))
- return {
- "enabled": True,
- "running": first.is_running,
- "mode": first.mode,
- "name": first.name,
- "serial": first.serial,
- "model": first.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- "model_name": VIRTUAL_PRINTER_MODELS.get(
- first.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- first.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
- ),
- "pending_files": first.get_status().get("pending_files", 0),
- **({"target_printer_ip": first.target_printer_ip} if first.is_proxy else {}),
- **({"proxy": first.get_status().get("proxy", {})} if first.is_proxy else {}),
- }
- return {
- "enabled": False,
- "running": False,
- "mode": "immediate",
- "name": "Bambuddy",
- "serial": "",
- "model": DEFAULT_VIRTUAL_PRINTER_MODEL,
- "model_name": VIRTUAL_PRINTER_MODELS[DEFAULT_VIRTUAL_PRINTER_MODEL],
- "pending_files": 0,
- }
- async def configure(
- self,
- enabled: bool,
- access_code: str = "",
- mode: str = "immediate",
- model: str = "",
- target_printer_ip: str = "",
- target_printer_serial: str = "",
- remote_interface_ip: str = "",
- ) -> None:
- """Legacy single-printer configure. Delegates to sync_from_db()."""
- # This method is kept for backward compat with the settings endpoint.
- # The actual work is done by sync_from_db() which reads from the DB.
- await self.sync_from_db()
- # Global instance
- virtual_printer_manager = VirtualPrinterManager()
|