manager.py 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098
  1. """Virtual Printer Manager - coordinates SSDP, MQTT, and FTP services.
  2. Each virtual printer runs its own independent services (FTP, MQTT, SSDP, Bind)
  3. bound to its dedicated IP address, regardless of mode.
  4. """
  5. import asyncio
  6. import logging
  7. from collections.abc import Callable
  8. from datetime import datetime, timezone
  9. from pathlib import Path
  10. from typing import TYPE_CHECKING
  11. from backend.app.core.config import settings as app_settings
  12. from backend.app.services.virtual_printer.bind_server import BindServer
  13. from backend.app.services.virtual_printer.certificate import CertificateService
  14. from backend.app.services.virtual_printer.ftp_server import VirtualPrinterFTPServer
  15. from backend.app.services.virtual_printer.mqtt_bridge import MQTTBridge
  16. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  17. from backend.app.services.virtual_printer.ssdp_server import SSDPProxy, VirtualPrinterSSDPServer
  18. from backend.app.services.virtual_printer.tailscale import tailscale_service
  19. from backend.app.services.virtual_printer.tcp_proxy import SlicerProxyManager, TCPProxy
  20. if TYPE_CHECKING:
  21. from backend.app.services.printer_manager import PrinterManager
  22. logger = logging.getLogger(__name__)
  23. # Mapping of SSDP model codes to display names
  24. # These are the codes that slicers expect during discovery
  25. # Sources:
  26. # - https://gist.github.com/Alex-Schaefer/72a9e2491a42da2ef99fb87601955cc3
  27. # - https://github.com/psychoticbeef/BambuLabOrcaSlicerDiscovery
  28. VIRTUAL_PRINTER_MODELS = {
  29. # X1 Series
  30. "BL-P001": "X1C", # X1 Carbon
  31. "BL-P002": "X1", # X1
  32. "C13": "X1E", # X1E
  33. # X2 Series
  34. "N6": "X2D", # X2D
  35. # P Series
  36. "C11": "P1P", # P1P
  37. "C12": "P1S", # P1S
  38. "N7": "P2S", # P2S
  39. # A1 Series
  40. "N2S": "A1", # A1
  41. "N1": "A1 Mini", # A1 Mini
  42. # H2 Series
  43. "O1D": "H2D", # H2D
  44. "O1C": "H2C", # H2C
  45. "O1C2": "H2C", # H2C (dual nozzle variant)
  46. "O1S": "H2S", # H2S
  47. }
  48. # Serial number prefixes for each model (based on Bambu Lab serial number format)
  49. # Format: MMM??RYMDDUUUUU (15 chars total)
  50. # MMM = Model prefix (3 chars)
  51. # ?? = Unknown/revision code (2 chars)
  52. # R = Revision letter (1 char)
  53. # Y = Year digit (1 char)
  54. # M = Month (1 char, hex: 1-9, A=Oct, B=Nov, C=Dec)
  55. # DD = Day (2 chars)
  56. # UUUUU = Unit number (5 chars)
  57. MODEL_SERIAL_PREFIXES = {
  58. # X1 Series
  59. "BL-P001": "00M00A", # X1C
  60. "BL-P002": "00M00A", # X1
  61. "C13": "03W00A", # X1E
  62. # X2 Series
  63. "N6": "20P90A", # X2D (first 4 chars "20P9" match real serials)
  64. # P Series
  65. "C11": "01S00A", # P1P
  66. "C12": "01P00A", # P1S
  67. "N7": "22E00A", # P2S
  68. # A1 Series
  69. "N2S": "03900A", # A1
  70. "N1": "03000A", # A1 Mini
  71. # H2 Series
  72. "O1D": "09400A", # H2D
  73. "O1C": "09400A", # H2C
  74. "O1C2": "09400A", # H2C (dual nozzle variant)
  75. "O1S": "09400A", # H2S
  76. }
  77. # Reverse mapping: display name → SSDP model code (for auto-inheriting from printer model)
  78. DISPLAY_NAME_TO_MODEL_CODE = {v: k for k, v in VIRTUAL_PRINTER_MODELS.items()}
  79. # Default model
  80. DEFAULT_VIRTUAL_PRINTER_MODEL = "BL-P001" # X1C
  81. def _get_serial_for_model(model: str, serial_suffix: str) -> str:
  82. """Get serial number for the given model and suffix."""
  83. prefix = MODEL_SERIAL_PREFIXES.get(model, "00M09A")
  84. return f"{prefix}{serial_suffix}"
  85. class VirtualPrinterInstance:
  86. """Per-printer state and file handling logic.
  87. Each instance represents one virtual printer with its own config,
  88. upload directory, certificates, and file handling mode.
  89. """
  90. def __init__(
  91. self,
  92. *,
  93. vp_id: int,
  94. name: str,
  95. mode: str,
  96. model: str,
  97. access_code: str,
  98. serial_suffix: str,
  99. target_printer_ip: str = "",
  100. target_printer_serial: str = "",
  101. target_printer_id: int | None = None,
  102. auto_dispatch: bool = True,
  103. queue_force_color_match: bool = False,
  104. bind_ip: str = "",
  105. remote_interface_ip: str = "",
  106. tailscale_disabled: bool = True,
  107. base_dir: Path,
  108. session_factory: Callable | None = None,
  109. printer_manager: "PrinterManager | None" = None,
  110. ):
  111. self.id = vp_id
  112. self.name = name
  113. self.mode = mode
  114. self.model = model
  115. self.access_code = access_code
  116. self.serial_suffix = serial_suffix
  117. self.target_printer_ip = target_printer_ip
  118. self.target_printer_serial = target_printer_serial
  119. self.target_printer_id = target_printer_id
  120. self.auto_dispatch = auto_dispatch
  121. self.queue_force_color_match = queue_force_color_match
  122. self.bind_ip = bind_ip
  123. self.remote_interface_ip = remote_interface_ip
  124. self.tailscale_disabled = tailscale_disabled
  125. self._session_factory = session_factory
  126. self._printer_manager = printer_manager
  127. # Directories
  128. self.upload_dir = base_dir / "uploads" / str(vp_id)
  129. self.cert_dir = base_dir / "certs" / str(vp_id)
  130. shared_ca_dir = base_dir / "certs"
  131. # Ensure directories exist
  132. self.upload_dir.mkdir(parents=True, exist_ok=True)
  133. (self.upload_dir / "cache").mkdir(exist_ok=True)
  134. self.cert_dir.mkdir(parents=True, exist_ok=True)
  135. # Certificate service (shared CA, per-instance printer cert)
  136. self._cert_service = CertificateService(
  137. cert_dir=self.cert_dir,
  138. serial=self.serial,
  139. shared_ca_dir=shared_ca_dir,
  140. )
  141. # Tailscale FQDN used for this instance (set at start_server/start_proxy time)
  142. self.tailscale_fqdn: str | None = None
  143. # Pending files for MQTT correlation
  144. self._pending_files: dict[str, Path] = {}
  145. # Per-instance services
  146. self._proxy: SlicerProxyManager | None = None
  147. self._ftp: VirtualPrinterFTPServer | None = None
  148. self._mqtt: SimpleMQTTServer | None = None
  149. self._mqtt_bridge: MQTTBridge | None = None
  150. self._rtsp_proxy: TCPProxy | None = None
  151. self._bind: BindServer | None = None
  152. self._ssdp: VirtualPrinterSSDPServer | None = None
  153. self._ssdp_proxy: SSDPProxy | None = None
  154. self._tasks: list[asyncio.Task] = []
  155. self._cert_renewal_task: asyncio.Task | None = None
  156. self._cert_restart_task: asyncio.Task | None = None
  157. @property
  158. def serial(self) -> str:
  159. """Full serial number for this virtual printer."""
  160. return _get_serial_for_model(self.model or DEFAULT_VIRTUAL_PRINTER_MODEL, self.serial_suffix)
  161. @property
  162. def cert_path(self) -> Path:
  163. return self._cert_service.cert_path
  164. @property
  165. def key_path(self) -> Path:
  166. return self._cert_service.key_path
  167. @property
  168. def is_proxy(self) -> bool:
  169. return self.mode == "proxy"
  170. @property
  171. def is_running(self) -> bool:
  172. return len(self._tasks) > 0 and all(not t.done() for t in self._tasks)
  173. def generate_certificates(self) -> tuple[Path, Path]:
  174. """Generate certificates for this instance."""
  175. self._cert_service.serial = self.serial if not self.is_proxy else (self.target_printer_serial or self.serial)
  176. additional_ips = [self.remote_interface_ip] if self.remote_interface_ip else None
  177. if self.bind_ip:
  178. additional_ips = additional_ips or []
  179. additional_ips.append(self.bind_ip)
  180. self._cert_service.delete_printer_certificate()
  181. return self._cert_service.generate_certificates(additional_ips=additional_ips)
  182. # -- File handling callbacks --
  183. async def on_file_received(self, file_path: Path, source_ip: str) -> None:
  184. """Handle file upload completion from FTP."""
  185. logger.info("[VP %s] Received file: %s from %s", self.name, file_path.name, source_ip)
  186. self._pending_files[file_path.name] = file_path
  187. if self.mode == "immediate":
  188. await self._archive_file(file_path, source_ip)
  189. elif self.mode == "print_queue":
  190. await self._add_to_print_queue(file_path, source_ip)
  191. else:
  192. await self._queue_file(file_path, source_ip)
  193. # Reset MQTT status back to IDLE
  194. if self._mqtt and file_path.suffix.lower() == ".3mf":
  195. self._mqtt.set_gcode_state("IDLE")
  196. async def on_print_command(self, filename: str, data: dict) -> None:
  197. """Handle print command from MQTT."""
  198. logger.info("[VP %s] Print command for: %s", self.name, filename)
  199. async def _archive_file(self, file_path: Path, source_ip: str) -> None:
  200. """Archive file immediately."""
  201. if not self._session_factory:
  202. logger.error("Cannot archive: no database session factory configured")
  203. return
  204. if file_path.suffix.lower() != ".3mf":
  205. logger.debug("Skipping non-3MF file: %s", file_path.name)
  206. self._pending_files.pop(file_path.name, None)
  207. try:
  208. file_path.unlink()
  209. except OSError:
  210. pass
  211. return
  212. try:
  213. from backend.app.api.routes.settings import get_setting
  214. from backend.app.services.archive import ArchiveService
  215. async with self._session_factory() as db:
  216. name_source = await get_setting(db, "virtual_printer_archive_name_source")
  217. prefer_filename = name_source == "filename"
  218. service = ArchiveService(db)
  219. archive = await service.archive_print(
  220. printer_id=None,
  221. source_file=file_path,
  222. print_data={
  223. "status": "archived",
  224. "source": "virtual_printer",
  225. "source_ip": source_ip,
  226. },
  227. prefer_filename_for_name=prefer_filename,
  228. )
  229. if archive:
  230. logger.info("[VP %s] Archived: %s - %s", self.name, archive.id, archive.print_name)
  231. try:
  232. file_path.unlink()
  233. except OSError:
  234. pass
  235. self._pending_files.pop(file_path.name, None)
  236. else:
  237. logger.error("Failed to archive file: %s", file_path.name)
  238. except Exception as e:
  239. logger.error("Error archiving file: %s", e)
  240. async def _queue_file(self, file_path: Path, source_ip: str) -> None:
  241. """Queue file for user review."""
  242. if not self._session_factory:
  243. logger.error("Cannot queue: no database session factory configured")
  244. return
  245. if file_path.suffix.lower() != ".3mf":
  246. self._pending_files.pop(file_path.name, None)
  247. try:
  248. file_path.unlink()
  249. except OSError:
  250. pass
  251. return
  252. # Peek at the 3MF for the embedded title BEFORE we hand it off to the
  253. # DB. Storing it now means the /pending-uploads/ list doesn't have to
  254. # reopen every 3MF on every render to keep the review card and the
  255. # eventual archive name in sync (#1152 follow-up). Failure to parse is
  256. # not fatal — the response model falls back to the filename stem.
  257. metadata_print_name: str | None = None
  258. try:
  259. from backend.app.services.archive import ThreeMFParser
  260. parsed = ThreeMFParser(file_path).parse()
  261. raw_name = parsed.get("print_name")
  262. if isinstance(raw_name, str) and raw_name.strip():
  263. metadata_print_name = raw_name.strip()[:255]
  264. except Exception as e:
  265. logger.debug("[VP %s] Metadata title peek failed for %s: %s", self.name, file_path.name, e)
  266. try:
  267. from backend.app.models.pending_upload import PendingUpload
  268. async with self._session_factory() as db:
  269. pending = PendingUpload(
  270. filename=file_path.name,
  271. file_path=str(file_path),
  272. file_size=file_path.stat().st_size,
  273. source_ip=source_ip,
  274. status="pending",
  275. uploaded_at=datetime.now(timezone.utc),
  276. metadata_print_name=metadata_print_name,
  277. )
  278. db.add(pending)
  279. await db.commit()
  280. logger.info("[VP %s] Queued: %s - %s", self.name, pending.id, file_path.name)
  281. self._pending_files.pop(file_path.name, None)
  282. except Exception as e:
  283. logger.error("Error queueing file: %s", e)
  284. async def _add_to_print_queue(self, file_path: Path, source_ip: str) -> None:
  285. """Archive file and add to print queue, assigned to target printer or model."""
  286. if not self._session_factory:
  287. logger.error("Cannot add to print queue: no database session factory configured")
  288. return
  289. if file_path.suffix.lower() != ".3mf":
  290. self._pending_files.pop(file_path.name, None)
  291. try:
  292. file_path.unlink()
  293. except OSError:
  294. pass
  295. return
  296. try:
  297. import json
  298. from backend.app.api.routes.settings import get_setting
  299. from backend.app.models.print_queue import PrintQueueItem
  300. from backend.app.services.archive import ArchiveService
  301. from backend.app.services.filament_requirements import extract_filament_requirements
  302. async with self._session_factory() as db:
  303. name_source = await get_setting(db, "virtual_printer_archive_name_source")
  304. prefer_filename = name_source == "filename"
  305. service = ArchiveService(db)
  306. archive = await service.archive_print(
  307. printer_id=None,
  308. source_file=file_path,
  309. print_data={
  310. "status": "archived",
  311. "source": "virtual_printer",
  312. "source_ip": source_ip,
  313. },
  314. prefer_filename_for_name=prefer_filename,
  315. )
  316. if archive:
  317. logger.info("[VP %s] Archived: %s - %s", self.name, archive.id, archive.print_name)
  318. # Assign to specific printer if configured, otherwise use model for "Any X" scheduling
  319. target_model = None
  320. if not self.target_printer_id and self.model:
  321. target_model = VIRTUAL_PRINTER_MODELS.get(self.model)
  322. plate_id = self._extract_plate_id(file_path)
  323. # Parse the 3MF for per-slot filament requirements (#1188).
  324. # The manual /print-queue/ POST flow does this at queue-add
  325. # time; the VP path used to skip it, so the scheduler fell
  326. # through to model-only matching and dispatched onto whatever
  327. # printer happened to be free regardless of loaded colour.
  328. # required_filament_types is populated unconditionally — it's
  329. # cheap, lets the scheduler reject obvious mis-matches even
  330. # without force_color_match. filament_overrides only carries
  331. # force_color_match=True when the per-VP setting is on, so
  332. # upgraders keep the old behaviour by default.
  333. required_filament_types_json: str | None = None
  334. filament_overrides_json: str | None = None
  335. requirements = extract_filament_requirements(file_path, plate_id)
  336. if requirements:
  337. types = sorted({r["type"] for r in requirements if r.get("type")})
  338. if types:
  339. required_filament_types_json = json.dumps(types)
  340. if self.queue_force_color_match:
  341. overrides = [
  342. {
  343. "slot_id": r["slot_id"],
  344. "type": r.get("type", ""),
  345. "color": r.get("color", ""),
  346. "force_color_match": True,
  347. }
  348. for r in requirements
  349. if r.get("type") and r.get("color")
  350. ]
  351. if overrides:
  352. filament_overrides_json = json.dumps(overrides)
  353. queue_item = PrintQueueItem(
  354. printer_id=self.target_printer_id,
  355. target_model=target_model,
  356. archive_id=archive.id,
  357. plate_id=plate_id,
  358. position=1,
  359. status="pending",
  360. manual_start=not self.auto_dispatch,
  361. required_filament_types=required_filament_types_json,
  362. filament_overrides=filament_overrides_json,
  363. )
  364. db.add(queue_item)
  365. await db.commit()
  366. logger.info("[VP %s] Added to queue: %s", self.name, queue_item.id)
  367. try:
  368. file_path.unlink()
  369. except OSError:
  370. pass
  371. self._pending_files.pop(file_path.name, None)
  372. else:
  373. logger.error("Failed to archive file: %s", file_path.name)
  374. except Exception as e:
  375. logger.error("Error adding to print queue: %s", e)
  376. @staticmethod
  377. def _extract_plate_id(file_path: Path) -> int | None:
  378. """Extract plate index from 3MF slice_info.config."""
  379. try:
  380. import xml.etree.ElementTree as ET
  381. import zipfile
  382. with zipfile.ZipFile(file_path, "r") as zf:
  383. if "Metadata/slice_info.config" in zf.namelist():
  384. content = zf.read("Metadata/slice_info.config").decode()
  385. root = ET.fromstring(content) # noqa: S314 # nosec B314
  386. plate = root.find(".//plate")
  387. if plate is not None:
  388. for meta in plate.findall("metadata"):
  389. if meta.get("key") == "index" and meta.get("value"):
  390. return int(meta.get("value"))
  391. except Exception:
  392. return None
  393. return None
  394. # -- Service lifecycle --
  395. async def _cancel_renewal_task(self) -> None:
  396. """Cancel the cert renewal task and await its completion."""
  397. if self._cert_renewal_task:
  398. self._cert_renewal_task.cancel()
  399. try:
  400. await self._cert_renewal_task
  401. except asyncio.CancelledError:
  402. pass
  403. except Exception as e:
  404. logger.warning("[VP %s] Unexpected error in cert renewal task: %s", self.name, e)
  405. self._cert_renewal_task = None
  406. async def _cancel_restart_task(self) -> None:
  407. """Cancel the cert restart task and await its completion.
  408. Skip when the caller IS the restart task itself — stop_server() /
  409. stop_proxy() are called from inside _restart_for_cert_renewal,
  410. which runs AS _cert_restart_task. Cancelling + awaiting self
  411. flags a CancelledError on the next `await` in stop_server,
  412. which tears down the old listeners but never lets start_server
  413. run — the VP would sit on an expired cert until process restart.
  414. """
  415. task = self._cert_restart_task
  416. if task is asyncio.current_task():
  417. # Renewal path cleaning up its own restart task: clear the
  418. # reference so future callers don't see a stale task handle,
  419. # but do NOT cancel-and-await ourselves.
  420. self._cert_restart_task = None
  421. return
  422. if task and not task.done():
  423. task.cancel()
  424. try:
  425. await task
  426. except asyncio.CancelledError:
  427. pass
  428. except Exception as e:
  429. logger.warning("[VP %s] Unexpected error in cert restart task: %s", self.name, e)
  430. self._cert_restart_task = None
  431. async def _restart_for_cert_renewal(self) -> None:
  432. """Restart VP services to load the newly renewed Tailscale cert into TLS listeners."""
  433. logger.info("[VP %s] Restarting services to apply renewed Tailscale cert", self.name)
  434. try:
  435. if self.is_proxy:
  436. await self.stop_proxy()
  437. await self.start_proxy()
  438. else:
  439. await self.stop_server()
  440. await self.start_server()
  441. except asyncio.CancelledError:
  442. raise
  443. except Exception as e:
  444. logger.error("[VP %s] Failed to restart after cert renewal: %s", self.name, e)
  445. async def _cert_renewal_loop(self) -> None:
  446. """Daily background check for Tailscale cert renewal while VP is running.
  447. Checks first, then sleeps, so a cert that was just barely renewed at startup
  448. is not re-checked for another 24 h. When a renewal actually happens the loop
  449. schedules a VP restart so the new cert is loaded into the running TLS listeners.
  450. _cert_renewal_task is tracked separately from _tasks because it has a different
  451. lifecycle: it runs for the entire lifetime of the VP, not just during service start.
  452. """
  453. while True:
  454. try:
  455. if self.tailscale_fqdn:
  456. needs_renewal = tailscale_service.cert_needs_renewal(
  457. self._cert_service.ts_cert_path, fqdn=self.tailscale_fqdn
  458. )
  459. if needs_renewal:
  460. renewed = await self._cert_service.use_tailscale_cert(self.tailscale_fqdn, tailscale_service)
  461. if renewed:
  462. logger.info(
  463. "[VP %s] Tailscale cert renewed for %s, scheduling restart",
  464. self.name,
  465. self.tailscale_fqdn,
  466. )
  467. # Schedule restart in a separate task; this loop ends here
  468. # so the restart can cleanly cancel _cert_renewal_task and
  469. # create a fresh one via start_server/start_proxy.
  470. self._cert_restart_task = asyncio.create_task(
  471. self._restart_for_cert_renewal(),
  472. name=f"vp_{self.id}_cert_restart",
  473. )
  474. break
  475. await asyncio.sleep(86400) # check once per day
  476. except asyncio.CancelledError:
  477. break
  478. except Exception as e:
  479. logger.error("[VP %s] Cert renewal loop error: %s", self.name, e)
  480. await asyncio.sleep(3600) # back off 1 h on unexpected error
  481. async def _resolve_cert_and_advertise(self) -> tuple[Path, Path, str]:
  482. """Return (cert_path, key_path, advertise_address) for TLS services.
  483. When Tailscale is available, provisions a LE cert and returns the
  484. Tailscale FQDN as the advertise address so SSDP broadcasts the hostname
  485. that matches the trusted cert.
  486. Falls back to the self-signed cert and IP-based advertising when
  487. Tailscale is absent or provisioning fails.
  488. """
  489. if self.tailscale_disabled:
  490. logger.info("[VP %s] Tailscale integration disabled by user, using self-signed cert", self.name)
  491. else:
  492. try:
  493. ts_status = await tailscale_service.get_status()
  494. if ts_status.available:
  495. ts_result = await self._cert_service.use_tailscale_cert(ts_status.fqdn, tailscale_service)
  496. if ts_result:
  497. self.tailscale_fqdn = ts_status.fqdn
  498. logger.info("[VP %s] Using Tailscale cert for %s", self.name, ts_status.fqdn)
  499. return ts_result[0], ts_result[1], ts_status.fqdn
  500. logger.warning(
  501. "[VP %s] Tailscale available (%s) but cert provisioning failed, falling back to self-signed cert",
  502. self.name,
  503. ts_status.fqdn,
  504. )
  505. else:
  506. logger.info(
  507. "[VP %s] Tailscale not available (%s), using self-signed cert",
  508. self.name,
  509. ts_status.error or "not connected",
  510. )
  511. except Exception as e:
  512. logger.warning("[VP %s] Tailscale cert check failed, falling back to self-signed: %s", self.name, e)
  513. self.tailscale_fqdn = None
  514. cert_path, key_path = self.generate_certificates()
  515. advertise = self.remote_interface_ip or self.bind_ip or ""
  516. return cert_path, key_path, advertise
  517. async def start_server(self) -> None:
  518. """Start server-mode services (FTP, MQTT, SSDP, Bind) on this VP's bind_ip."""
  519. logger.info("[VP %s] Starting server-mode services on %s", self.name, self.bind_ip)
  520. cert_path, key_path, advertise_addr = await self._resolve_cert_and_advertise()
  521. bind_addr = self.bind_ip or "0.0.0.0" # nosec B104
  522. async def run_with_logging(coro, svc_name):
  523. try:
  524. await coro
  525. except Exception as e:
  526. logger.error("[VP %s] %s failed: %s", self.name, svc_name, e)
  527. self._tasks = []
  528. # FTP server
  529. self._ftp = VirtualPrinterFTPServer(
  530. upload_dir=self.upload_dir,
  531. access_code=self.access_code,
  532. cert_path=cert_path,
  533. key_path=key_path,
  534. on_file_received=self.on_file_received,
  535. bind_address=bind_addr,
  536. vp_name=self.name,
  537. )
  538. self._tasks.append(
  539. asyncio.create_task(
  540. run_with_logging(self._ftp.start(), "FTP"),
  541. name=f"vp_{self.id}_ftp",
  542. )
  543. )
  544. # MQTT server
  545. self._mqtt = SimpleMQTTServer(
  546. serial=self.serial,
  547. access_code=self.access_code,
  548. cert_path=cert_path,
  549. key_path=key_path,
  550. on_print_command=self.on_print_command,
  551. model=self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  552. bind_address=bind_addr,
  553. vp_name=self.name,
  554. )
  555. self._tasks.append(
  556. asyncio.create_task(
  557. run_with_logging(self._mqtt.start(), "MQTT"),
  558. name=f"vp_{self.id}_mqtt",
  559. )
  560. )
  561. # MQTT bridge — fans out the target printer's pushes to slicers connected
  562. # to this VP and forwards their commands back to the printer. Only meaningful
  563. # when a target printer is configured AND printer_manager was injected (it
  564. # always is at runtime; tests may omit it).
  565. if self.target_printer_id is not None and self._printer_manager is not None:
  566. self._mqtt_bridge = MQTTBridge(
  567. vp_id=self.id,
  568. vp_name=self.name,
  569. vp_serial=self.serial,
  570. target_printer_id=self.target_printer_id,
  571. mqtt_server=self._mqtt,
  572. printer_manager=self._printer_manager,
  573. )
  574. self._mqtt.set_bridge(self._mqtt_bridge)
  575. await self._mqtt_bridge.start()
  576. # RTSPS camera passthrough on port 322. BambuStudio's camera button
  577. # connects to the device IP it bound on (the VP), not the IP in
  578. # `ipcam.rtsp_url`. Without a listener on <bind_ip>:322 the slicer
  579. # gets connection refused → "LAN connection failed". Same raw TCP
  580. # pass-through used by SlicerProxyManager in proxy mode.
  581. target_client = self._printer_manager.get_client(self.target_printer_id)
  582. target_ip = getattr(target_client, "ip_address", None) if target_client else None
  583. if target_ip:
  584. self._rtsp_proxy = TCPProxy(
  585. name="RTSP",
  586. listen_port=322,
  587. target_host=target_ip,
  588. target_port=322,
  589. bind_address=bind_addr,
  590. )
  591. self._tasks.append(
  592. asyncio.create_task(
  593. run_with_logging(self._rtsp_proxy.start(), "RTSP"),
  594. name=f"vp_{self.id}_rtsp",
  595. )
  596. )
  597. # Bind server
  598. self._bind = BindServer(
  599. serial=self.serial,
  600. model=self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  601. name=self.name,
  602. bind_address=bind_addr,
  603. cert_path=cert_path,
  604. key_path=key_path,
  605. )
  606. self._tasks.append(
  607. asyncio.create_task(
  608. run_with_logging(self._bind.start(), "Bind"),
  609. name=f"vp_{self.id}_bind",
  610. )
  611. )
  612. # SSDP server — advertise_addr is the Tailscale FQDN when available,
  613. # otherwise the bind/remote IP (existing behaviour)
  614. self._ssdp = VirtualPrinterSSDPServer(
  615. name=self.name,
  616. serial=self.serial,
  617. model=self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  618. advertise_ip=advertise_addr,
  619. bind_ip=bind_addr,
  620. )
  621. self._tasks.append(
  622. asyncio.create_task(
  623. run_with_logging(self._ssdp.start(), "SSDP"),
  624. name=f"vp_{self.id}_ssdp",
  625. )
  626. )
  627. # Guard against double-start: cancel any orphaned task before creating a new one
  628. await self._cancel_renewal_task()
  629. self._cert_renewal_task = asyncio.create_task(self._cert_renewal_loop(), name=f"vp_{self.id}_cert_renewal")
  630. logger.info("[VP %s] Server-mode services started on %s", self.name, bind_addr)
  631. async def stop_server(self) -> None:
  632. """Stop server-mode services."""
  633. await self._cancel_renewal_task()
  634. await self._cancel_restart_task()
  635. if self._mqtt_bridge:
  636. try:
  637. await self._mqtt_bridge.stop()
  638. except Exception:
  639. logger.exception("[VP %s] MQTT bridge stop failed", self.name)
  640. if self._mqtt:
  641. self._mqtt.set_bridge(None)
  642. self._mqtt_bridge = None
  643. if self._rtsp_proxy:
  644. try:
  645. await self._rtsp_proxy.stop()
  646. except Exception:
  647. logger.exception("[VP %s] RTSP proxy stop failed", self.name)
  648. self._rtsp_proxy = None
  649. if self._ftp:
  650. await self._ftp.stop()
  651. self._ftp = None
  652. if self._mqtt:
  653. await self._mqtt.stop()
  654. self._mqtt = None
  655. if self._bind:
  656. await self._bind.stop()
  657. self._bind = None
  658. if self._ssdp:
  659. await self._ssdp.stop()
  660. self._ssdp = None
  661. await self._cancel_tasks()
  662. async def start_proxy(self) -> None:
  663. """Start proxy mode services for this instance."""
  664. logger.info("[VP %s] Starting proxy mode to %s", self.name, self.target_printer_ip)
  665. cert_path, key_path, _ = await self._resolve_cert_and_advertise()
  666. self._proxy = SlicerProxyManager(
  667. target_host=self.target_printer_ip,
  668. cert_path=cert_path,
  669. key_path=key_path,
  670. on_activity=lambda n, m: logger.info("[VP %s] Proxy %s: %s", self.name, n, m),
  671. bind_address=self.bind_ip or "0.0.0.0", # nosec B104
  672. bind_identity={
  673. "serial": self.target_printer_serial or self.serial,
  674. "model": self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  675. "name": self.name,
  676. "version": "01.00.00.00",
  677. },
  678. )
  679. async def run_with_logging(coro, svc_name):
  680. try:
  681. await coro
  682. except Exception as e:
  683. logger.error("[VP %s] %s failed: %s", self.name, svc_name, e)
  684. self._tasks = []
  685. # SSDP for proxy
  686. proxy_serial = self.target_printer_serial or self.serial
  687. if self.remote_interface_ip:
  688. from backend.app.services.network_utils import find_interface_for_ip
  689. local_iface = find_interface_for_ip(self.target_printer_ip)
  690. if local_iface:
  691. self._ssdp_proxy = SSDPProxy(
  692. local_interface_ip=local_iface["ip"],
  693. remote_interface_ip=self.remote_interface_ip,
  694. target_printer_ip=self.target_printer_ip,
  695. name=self.name,
  696. )
  697. self._tasks.append(
  698. asyncio.create_task(
  699. run_with_logging(self._ssdp_proxy.start(), "SSDP Proxy"),
  700. name=f"vp_{self.id}_ssdp_proxy",
  701. )
  702. )
  703. else:
  704. self._start_fallback_ssdp(proxy_serial, run_with_logging)
  705. else:
  706. self._start_fallback_ssdp(proxy_serial, run_with_logging)
  707. self._tasks.append(
  708. asyncio.create_task(
  709. run_with_logging(self._proxy.start(), "Proxy"),
  710. name=f"vp_{self.id}_proxy",
  711. )
  712. )
  713. # Guard against double-start: cancel any orphaned task before creating a new one
  714. await self._cancel_renewal_task()
  715. self._cert_renewal_task = asyncio.create_task(self._cert_renewal_loop(), name=f"vp_{self.id}_cert_renewal")
  716. def _start_fallback_ssdp(self, proxy_serial: str, run_with_logging) -> None:
  717. """Start single-interface SSDP server as fallback for proxy mode."""
  718. self._ssdp = VirtualPrinterSSDPServer(
  719. name=f"{self.name} (Proxy)",
  720. serial=proxy_serial,
  721. model=self.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  722. advertise_ip=self.bind_ip or "",
  723. bind_ip=self.bind_ip or "",
  724. )
  725. self._tasks.append(
  726. asyncio.create_task(
  727. run_with_logging(self._ssdp.start(), "SSDP"),
  728. name=f"vp_{self.id}_ssdp",
  729. )
  730. )
  731. async def stop_proxy(self) -> None:
  732. """Stop proxy mode services for this instance."""
  733. await self._cancel_renewal_task()
  734. await self._cancel_restart_task()
  735. if self._proxy:
  736. await self._proxy.stop()
  737. self._proxy = None
  738. if self._ssdp:
  739. await self._ssdp.stop()
  740. self._ssdp = None
  741. if self._ssdp_proxy:
  742. await self._ssdp_proxy.stop()
  743. self._ssdp_proxy = None
  744. await self._cancel_tasks()
  745. async def _cancel_tasks(self) -> None:
  746. """Cancel all running tasks and wait for cleanup."""
  747. for task in self._tasks:
  748. task.cancel()
  749. if self._tasks:
  750. try:
  751. await asyncio.wait_for(asyncio.gather(*self._tasks, return_exceptions=True), timeout=1.0)
  752. except TimeoutError:
  753. pass
  754. self._tasks = []
  755. def get_status(self) -> dict:
  756. """Get status for this instance."""
  757. status: dict = {
  758. "running": self.is_running,
  759. "pending_files": len(self._pending_files),
  760. }
  761. if self.tailscale_fqdn:
  762. status["tailscale_fqdn"] = self.tailscale_fqdn
  763. if self.is_proxy and self._proxy:
  764. status["proxy"] = self._proxy.get_status()
  765. return status
  766. class VirtualPrinterManager:
  767. """Multi-instance virtual printer registry and orchestrator.
  768. Every VP runs its own independent services on a dedicated bind IP.
  769. """
  770. def __init__(self):
  771. self._session_factory: Callable | None = None
  772. self._printer_manager: PrinterManager | None = None
  773. self._instances: dict[int, VirtualPrinterInstance] = {}
  774. # Directories
  775. self._base_dir = app_settings.base_dir / "virtual_printer"
  776. # Ensure base directories exist
  777. self._ensure_base_directories()
  778. def _ensure_base_directories(self) -> None:
  779. """Create base directories at startup."""
  780. for dir_path in [self._base_dir, self._base_dir / "uploads", self._base_dir / "certs"]:
  781. try:
  782. dir_path.mkdir(parents=True, exist_ok=True)
  783. except PermissionError:
  784. logger.error(
  785. f"Cannot create directory {dir_path}: Permission denied. "
  786. f"For Docker: ensure the data volume is writable by the container user. "
  787. f"For bare metal: run 'sudo chown -R $(whoami) {self._base_dir}'"
  788. )
  789. def set_session_factory(self, session_factory: Callable) -> None:
  790. """Set the database session factory."""
  791. self._session_factory = session_factory
  792. def set_printer_manager(self, printer_manager: "PrinterManager") -> None:
  793. """Inject the global printer_manager so non-proxy VPs can mirror their target's MQTT stream."""
  794. self._printer_manager = printer_manager
  795. @property
  796. def is_enabled(self) -> bool:
  797. """Check if any virtual printer is running."""
  798. return len(self._instances) > 0
  799. async def sync_from_db(self) -> None:
  800. """Load all VPs from DB, reconcile running state."""
  801. if not self._session_factory:
  802. logger.warning("Cannot sync virtual printers: no session factory")
  803. return
  804. from sqlalchemy import select
  805. from backend.app.models.printer import Printer
  806. from backend.app.models.virtual_printer import VirtualPrinter
  807. async with self._session_factory() as db:
  808. result = await db.execute(
  809. select(VirtualPrinter).where(VirtualPrinter.enabled == True).order_by(VirtualPrinter.position) # noqa: E712
  810. )
  811. enabled_vps = result.scalars().all()
  812. # Stop instances that are no longer enabled or changed mode
  813. enabled_ids = {vp.id for vp in enabled_vps}
  814. for vp_id in list(self._instances.keys()):
  815. if vp_id not in enabled_ids:
  816. await self.remove_instance(vp_id)
  817. # Look up printer IPs for proxy VPs
  818. proxy_vps = [vp for vp in enabled_vps if vp.mode == "proxy"]
  819. proxy_ips: dict[int, tuple[str, str]] = {}
  820. if proxy_vps:
  821. async with self._session_factory() as db:
  822. for pvp in proxy_vps:
  823. if pvp.target_printer_id:
  824. result = await db.execute(select(Printer).where(Printer.id == pvp.target_printer_id))
  825. printer = result.scalar_one_or_none()
  826. if printer:
  827. proxy_ips[pvp.id] = (printer.ip_address, printer.serial_number)
  828. # Detect config changes on running instances and restart if needed
  829. for vp in enabled_vps:
  830. instance = self._instances.get(vp.id)
  831. if not instance:
  832. continue
  833. changed = (
  834. instance.mode != vp.mode
  835. or instance.model != (vp.model or DEFAULT_VIRTUAL_PRINTER_MODEL)
  836. or instance.access_code != (vp.access_code or "")
  837. or instance.bind_ip != (vp.bind_ip or "")
  838. or instance.remote_interface_ip != (vp.remote_interface_ip or "")
  839. or instance.target_printer_id != vp.target_printer_id
  840. or instance.auto_dispatch != vp.auto_dispatch
  841. or instance.tailscale_disabled != vp.tailscale_disabled
  842. )
  843. if changed:
  844. logger.info(
  845. "VP %s config changed (mode: %s→%s), restarting",
  846. instance.name,
  847. instance.mode,
  848. vp.mode,
  849. )
  850. await self.remove_instance(vp.id)
  851. # Start instances for all enabled VPs (skip already running)
  852. for vp in enabled_vps:
  853. if vp.id in self._instances:
  854. continue
  855. if vp.mode == "proxy":
  856. ip_info = proxy_ips.get(vp.id)
  857. if not ip_info:
  858. logger.warning("Proxy VP %s: target printer not found, skipping", vp.name)
  859. continue
  860. target_ip, target_serial = ip_info
  861. instance = VirtualPrinterInstance(
  862. vp_id=vp.id,
  863. name=vp.name,
  864. mode=vp.mode,
  865. model=vp.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  866. access_code=vp.access_code or "",
  867. serial_suffix=vp.serial_suffix,
  868. target_printer_ip=target_ip,
  869. target_printer_serial=target_serial,
  870. auto_dispatch=vp.auto_dispatch,
  871. bind_ip=vp.bind_ip or "",
  872. remote_interface_ip=vp.remote_interface_ip or "",
  873. tailscale_disabled=vp.tailscale_disabled,
  874. base_dir=self._base_dir,
  875. session_factory=self._session_factory,
  876. )
  877. self._instances[vp.id] = instance
  878. await instance.start_proxy()
  879. logger.info("Started proxy VP: %s → %s (bind=%s)", instance.name, target_ip, instance.bind_ip)
  880. else:
  881. instance = VirtualPrinterInstance(
  882. vp_id=vp.id,
  883. name=vp.name,
  884. mode=vp.mode,
  885. model=vp.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  886. access_code=vp.access_code or "",
  887. serial_suffix=vp.serial_suffix,
  888. target_printer_id=vp.target_printer_id,
  889. auto_dispatch=vp.auto_dispatch,
  890. queue_force_color_match=vp.queue_force_color_match,
  891. bind_ip=vp.bind_ip or "",
  892. remote_interface_ip=vp.remote_interface_ip or "",
  893. tailscale_disabled=vp.tailscale_disabled,
  894. base_dir=self._base_dir,
  895. session_factory=self._session_factory,
  896. printer_manager=self._printer_manager,
  897. )
  898. self._instances[vp.id] = instance
  899. await instance.start_server()
  900. logger.info("Started server-mode VP: %s on %s", instance.name, vp.bind_ip)
  901. async def remove_instance(self, vp_id: int) -> None:
  902. """Stop and remove a single VP instance."""
  903. instance = self._instances.pop(vp_id, None)
  904. if instance:
  905. if instance.is_proxy:
  906. await instance.stop_proxy()
  907. else:
  908. await instance.stop_server()
  909. logger.info("Removed VP instance: %s", instance.name)
  910. async def stop_all(self) -> None:
  911. """Shutdown all virtual printer services."""
  912. logger.info("Stopping all virtual printer services...")
  913. for vp_id in list(self._instances.keys()):
  914. await self.remove_instance(vp_id)
  915. logger.info("All virtual printer services stopped")
  916. def get_instance(self, vp_id: int) -> VirtualPrinterInstance | None:
  917. """Get a running instance by ID."""
  918. return self._instances.get(vp_id)
  919. def get_all_status(self) -> list[dict]:
  920. """Get status for all running instances."""
  921. return [
  922. {
  923. "id": inst.id,
  924. "name": inst.name,
  925. "mode": inst.mode,
  926. **inst.get_status(),
  927. }
  928. for inst in self._instances.values()
  929. ]
  930. # -- Legacy single-printer compat --
  931. def get_status(self) -> dict:
  932. """Get status for first virtual printer (backward compat)."""
  933. if self._instances:
  934. first = next(iter(self._instances.values()))
  935. return {
  936. "enabled": True,
  937. "running": first.is_running,
  938. "mode": first.mode,
  939. "name": first.name,
  940. "serial": first.serial,
  941. "model": first.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  942. "model_name": VIRTUAL_PRINTER_MODELS.get(
  943. first.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  944. first.model or DEFAULT_VIRTUAL_PRINTER_MODEL,
  945. ),
  946. "pending_files": first.get_status().get("pending_files", 0),
  947. **({"target_printer_ip": first.target_printer_ip} if first.is_proxy else {}),
  948. **({"proxy": first.get_status().get("proxy", {})} if first.is_proxy else {}),
  949. }
  950. return {
  951. "enabled": False,
  952. "running": False,
  953. "mode": "immediate",
  954. "name": "Bambuddy",
  955. "serial": "",
  956. "model": DEFAULT_VIRTUAL_PRINTER_MODEL,
  957. "model_name": VIRTUAL_PRINTER_MODELS[DEFAULT_VIRTUAL_PRINTER_MODEL],
  958. "pending_files": 0,
  959. }
  960. async def configure(
  961. self,
  962. enabled: bool,
  963. access_code: str = "",
  964. mode: str = "immediate",
  965. model: str = "",
  966. target_printer_ip: str = "",
  967. target_printer_serial: str = "",
  968. remote_interface_ip: str = "",
  969. ) -> None:
  970. """Legacy single-printer configure. Delegates to sync_from_db()."""
  971. # This method is kept for backward compat with the settings endpoint.
  972. # The actual work is done by sync_from_db() which reads from the DB.
  973. await self.sync_from_db()
  974. # Global instance
  975. virtual_printer_manager = VirtualPrinterManager()