Przeglądaj źródła

fix(virtual-printer): #1558 Send pre-flight + slicer-surface audit bundle

  #1558: cached-as-base push_status only forced gcode_state=IDLE while letting
  the real printer's live-progress fields (mc_percent, stg_cur, layer_num, ...)
  leak through. Bambu Studio's Send pre-flight read them as busy and refused.
  The cached branch now overrides the activity-field set the same way it
  already overrode storage indicators (#1228) and protocol fields.

  Same bundle ships a multi-round VP audit that found adjacent bugs in the
  same family:

  - #1558: cached branch zeroes mc_print_stage / mc_percent / mc_remaining_time / stg / stg_cur / layer_num / total_layer_num / print_error
  - MQTT auth: per-IP rate-limit (5/60s lockout), hmac.compare_digest, access_code redacted in DEBUG log
  - FTP cmd_STOR streams chunks to disk + 4 GiB cap (was buffering whole upload)
  - Sticky-keys allowlist extended with upgrade_state / xcam / hw_switch_state / nozzle_diameter / nozzle_type / online / ams_status
  - _pending_files cleanup in finally for archive / queue / dispatch handlers
  - _add_to_print_queue position uses MAX+1 (was hardcoded 1)
  - DELETE VP removes orphan PendingUpload rows + upload_dir from disk
  - Per-VP cert regenerates on shared-CA rotation (real signature verification, not DN match)
  - DHCP target-IP refresh + queue_force_color_match toggle now restart proxy VPs
  - Per-slicer bridge-response routing (multi-slicer cross-leak fix via sequence_id map)
  - Child-service readiness barrier (FTP / MQTT / Bind / SSDP) — no false is_running before sockets bind
  - H2D Pro O1E / O2D model codes added (experimental, needs field confirmation)
  - FTP passive port range widened 50000-51000; docker-compose + wiki updated
  - VP refresh_loop crash now unbinds raw_message_handler; tailscale catches asyncio.TimeoutError; SlicerProxyManager lifecycle hardening
maziggy 4 dni temu
rodzic
commit
597762685c

Plik diff jest za duży
+ 0 - 0
CHANGELOG.md


+ 33 - 1
backend/app/api/routes/virtual_printers.py

@@ -335,10 +335,17 @@ async def update_virtual_printer(
     if not vp:
     if not vp:
         return JSONResponse(status_code=404, content={"detail": "Virtual printer not found"})
         return JSONResponse(status_code=404, content={"detail": "Virtual printer not found"})
 
 
+    # Redact the access code before logging — model_dump otherwise includes
+    # the plaintext value at DEBUG, violating the project no-secrets-in-logs
+    # rule. Replace with a marker that still signals "the user changed it"
+    # vs "the user didn't touch this field".
+    _safe_body = body.model_dump(exclude_unset=True)
+    if "access_code" in _safe_body:
+        _safe_body["access_code"] = "***"
     logger.debug(
     logger.debug(
         "Update VP %d: body=%s, current state: mode=%s, enabled=%s, access_code_set=%s, bind_ip=%s, target=%s",
         "Update VP %d: body=%s, current state: mode=%s, enabled=%s, access_code_set=%s, bind_ip=%s, target=%s",
         vp_id,
         vp_id,
-        body.model_dump(exclude_unset=True),
+        _safe_body,
         vp.mode,
         vp.mode,
         vp.enabled,
         vp.enabled,
         bool(vp.access_code),
         bool(vp.access_code),
@@ -492,10 +499,35 @@ async def delete_virtual_printer(
     # Stop instance if running
     # Stop instance if running
     await virtual_printer_manager.remove_instance(vp_id)
     await virtual_printer_manager.remove_instance(vp_id)
 
 
+    # Mark any PendingUpload rows that referenced this VP's upload_dir as
+    # discarded — without this the rows live on as phantom entries in
+    # /pending-uploads/ pointing at file paths that no longer exist, and
+    # the user only learns they're orphaned by trying to archive one and
+    # getting a flip-to-discarded on file-missing.
+    upload_prefix = str(virtual_printer_manager._base_dir / "uploads" / str(vp_id))
+    try:
+        from backend.app.models.pending_upload import PendingUpload
+
+        stale = await db.execute(select(PendingUpload).where(PendingUpload.file_path.startswith(upload_prefix)))
+        for pending in stale.scalars().all():
+            pending.status = "discarded"
+        await db.flush()
+    except Exception as e:
+        logger.error("Failed to discard orphan PendingUpload rows for VP %d: %s", vp_id, e)
+
     # Delete from DB
     # Delete from DB
     await db.execute(sql_delete(VirtualPrinter).where(VirtualPrinter.id == vp_id))
     await db.execute(sql_delete(VirtualPrinter).where(VirtualPrinter.id == vp_id))
     await db.commit()
     await db.commit()
 
 
+    # Remove the on-disk upload directory after the DB commit succeeds, so
+    # a crash between commit and rmtree only leaves orphan files (vs orphan
+    # rows pointing at a now-missing tree).
+    upload_dir = virtual_printer_manager._base_dir / "uploads" / str(vp_id)
+    if upload_dir.exists():
+        import shutil
+
+        shutil.rmtree(upload_dir, ignore_errors=True)
+
     logger.info("Deleted virtual printer: %s (id=%d)", vp_name, vp_id)
     logger.info("Deleted virtual printer: %s (id=%d)", vp_name, vp_id)
 
 
     # Resync remaining services
     # Resync remaining services

+ 15 - 1
backend/app/services/virtual_printer/bind_server.py

@@ -64,6 +64,11 @@ class BindServer:
 
 
         self._servers: list[asyncio.Server] = []
         self._servers: list[asyncio.Server] = []
         self._running = False
         self._running = False
+        # Set after at least one bind port is listening — see ftp_server.py
+        # for rationale. Bind server is best-effort across BIND_PORTS, so
+        # "ready" means "at least one port bound", matching the existing
+        # serve_forever path.
+        self.ready = asyncio.Event()
 
 
     def _create_tls_context(self) -> ssl.SSLContext | None:
     def _create_tls_context(self) -> ssl.SSLContext | None:
         """Create SSL context for the TLS bind port (3002)."""
         """Create SSL context for the TLS bind port (3002)."""
@@ -122,6 +127,7 @@ class BindServer:
             if not self._servers:
             if not self._servers:
                 logger.error("Bind server: could not bind to any port")
                 logger.error("Bind server: could not bind to any port")
                 return
                 return
+            self.ready.set()
 
 
             # Serve all successfully bound ports
             # Serve all successfully bound ports
             await asyncio.gather(*(s.serve_forever() for s in self._servers))
             await asyncio.gather(*(s.serve_forever() for s in self._servers))
@@ -137,6 +143,7 @@ class BindServer:
         """Stop the bind server."""
         """Stop the bind server."""
         logger.info("Stopping bind server")
         logger.info("Stopping bind server")
         self._running = False
         self._running = False
+        self.ready.clear()
 
 
         for server in self._servers:
         for server in self._servers:
             try:
             try:
@@ -176,7 +183,14 @@ class BindServer:
                 logger.warning("Bind server: unexpected command from %s: %s", client_id, request)
                 logger.warning("Bind server: unexpected command from %s: %s", client_id, request)
                 return
                 return
 
 
-            # Build response
+            # Build response. `sequence_id` is an INTEGER counter chosen by
+            # the printer side (not an echo of the slicer's string seq_id).
+            # The protocol docstring at the top of this file documents the
+            # asymmetry: slicer sends `"20000"` (string), printer replies
+            # with an int. The hardcoded 3021 mirrors real-firmware-captured
+            # value; an earlier audit suggesting we echo the slicer's seq_id
+            # was wrong and would have broken slicers that validate the
+            # type (int vs string).
             response = {
             response = {
                 "login": {
                 "login": {
                     "bind": "free",
                     "bind": "free",

+ 58 - 4
backend/app/services/virtual_printer/certificate.py

@@ -72,10 +72,59 @@ class CertificateService:
             Tuple of (cert_path, key_path)
             Tuple of (cert_path, key_path)
         """
         """
         if self.cert_path.exists() and self.key_path.exists():
         if self.cert_path.exists() and self.key_path.exists():
-            logger.debug("Using existing virtual printer certificates")
-            return self.cert_path, self.key_path
+            if self._cert_matches_current_ca():
+                logger.debug("Using existing virtual printer certificates")
+                return self.cert_path, self.key_path
+            logger.warning(
+                "Existing per-VP certificate's issuer doesn't match the current CA "
+                "(likely a CA rotation since the cert was signed). Regenerating "
+                "to keep the slicer's imported CA in sync with the served chain."
+            )
         return self.generate_certificates()
         return self.generate_certificates()
 
 
+    def _cert_matches_current_ca(self) -> bool:
+        """Check whether the on-disk per-VP cert was signed by the current CA.
+
+        Slicers that import the shared CA validate the per-VP cert against it.
+        If the CA has been rotated since the per-VP cert was signed, the chain
+        is broken even though both files exist on disk. ``ensure_certificates``
+        uses this to decide whether to regenerate.
+
+        Uses real signature verification — Bambuddy's auto-generated CAs all
+        share the same Subject DN ("Virtual Printer CA"), so a DN-only compare
+        would incorrectly return True even after rotation.
+        """
+        try:
+            if not self.ca_cert_path.exists():
+                # No CA yet — let generate_certificates create one and the
+                # matching per-VP chain.
+                return False
+            cert_pem = self.cert_path.read_bytes()
+            cert = x509.load_pem_x509_certificate(cert_pem)
+            ca_pem = self.ca_cert_path.read_bytes()
+            ca_cert = x509.load_pem_x509_certificate(ca_pem)
+            from cryptography.exceptions import InvalidSignature
+            from cryptography.hazmat.primitives.asymmetric import padding
+
+            try:
+                ca_cert.public_key().verify(
+                    cert.signature,
+                    cert.tbs_certificate_bytes,
+                    padding.PKCS1v15(),
+                    cert.signature_hash_algorithm,
+                )
+                return True
+            except InvalidSignature:
+                return False
+        except (OSError, ValueError) as e:
+            logger.debug("CA-match probe failed for %s: %s", self.cert_path, e)
+            return False
+        except Exception as e:
+            # Any unexpected exception during verification → treat as mismatch
+            # and regenerate. Safer than reusing a cert we can't validate.
+            logger.debug("CA-match verification failed for %s: %s", self.cert_path, e)
+            return False
+
     def _load_existing_ca(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate] | None:
     def _load_existing_ca(self) -> tuple[rsa.RSAPrivateKey, x509.Certificate] | None:
         """Try to load existing CA certificate and key.
         """Try to load existing CA certificate and key.
 
 
@@ -123,8 +172,13 @@ class CertificateService:
         # Generate new CA
         # Generate new CA
         ca_key, ca_cert = self._generate_ca_certificate()
         ca_key, ca_cert = self._generate_ca_certificate()
 
 
-        # Save CA certificate and key
-        self.cert_dir.mkdir(parents=True, exist_ok=True)
+        # Save CA certificate and key. ``ca_key_path`` and ``ca_cert_path``
+        # resolve under ``shared_ca_dir`` (which may differ from cert_dir),
+        # so the parent we need to mkdir is the CA file's parent — not
+        # cert_dir. Previously this created the per-VP subdirectory while
+        # the writes targeted the parent CA dir, which works only because
+        # the manager pre-creates both — the method itself was latent.
+        self.ca_key_path.parent.mkdir(parents=True, exist_ok=True)
         self.ca_key_path.write_bytes(
         self.ca_key_path.write_bytes(
             ca_key.private_bytes(
             ca_key.private_bytes(
                 encoding=serialization.Encoding.PEM,
                 encoding=serialization.Encoding.PEM,

+ 15 - 2
backend/app/services/virtual_printer/diagnostic.py

@@ -26,6 +26,7 @@ logger = logging.getLogger(__name__)
 PORT_FTPS = 990  # implicit FTPS — slicer file upload
 PORT_FTPS = 990  # implicit FTPS — slicer file upload
 PORT_MQTT = 8883  # MQTT over TLS — control + status
 PORT_MQTT = 8883  # MQTT over TLS — control + status
 PORT_BIND = 3002  # bind/detect (TLS) — slicer discovery handshake
 PORT_BIND = 3002  # bind/detect (TLS) — slicer discovery handshake
+PORT_BIND_PLAIN = 3000  # bind/detect (plain) — legacy / some slicer models
 
 
 _PORT_PROBE_TIMEOUT = 2.0
 _PORT_PROBE_TIMEOUT = 2.0
 
 
@@ -134,14 +135,26 @@ async def run_vp_diagnostic(vp: VirtualPrinter, instance) -> VPDiagnosticResult:
         )
         )
         checks.append(DiagnosticCheck(id="port_bind", status="skip", params={"port": PORT_BIND}))
         checks.append(DiagnosticCheck(id="port_bind", status="skip", params={"port": PORT_BIND}))
     else:
     else:
-        ftp_ok, mqtt_ok, bind_ok = await asyncio.gather(
+        # The non-proxy bind server listens on BOTH 3000 (plain) and 3002
+        # (TLS) per bind_server.py BIND_PORTS — slicers pick either path.
+        # Probing only 3002 missed half-dead VPs where one listener failed
+        # to start and the other succeeded; report port_bind as pass only
+        # when both probes succeed.
+        ftp_ok, mqtt_ok, bind_tls_ok, bind_plain_ok = await asyncio.gather(
             _check_port(bind_ip, PORT_FTPS),
             _check_port(bind_ip, PORT_FTPS),
             _check_port(bind_ip, PORT_MQTT),
             _check_port(bind_ip, PORT_MQTT),
             _check_port(bind_ip, PORT_BIND),
             _check_port(bind_ip, PORT_BIND),
+            _check_port(bind_ip, PORT_BIND_PLAIN),
         )
         )
         checks.append(DiagnosticCheck(id="port_ftps", status="pass" if ftp_ok else "fail", params={"port": PORT_FTPS}))
         checks.append(DiagnosticCheck(id="port_ftps", status="pass" if ftp_ok else "fail", params={"port": PORT_FTPS}))
         checks.append(DiagnosticCheck(id="port_mqtt", status="pass" if mqtt_ok else "fail", params={"port": PORT_MQTT}))
         checks.append(DiagnosticCheck(id="port_mqtt", status="pass" if mqtt_ok else "fail", params={"port": PORT_MQTT}))
-        checks.append(DiagnosticCheck(id="port_bind", status="pass" if bind_ok else "fail", params={"port": PORT_BIND}))
+        checks.append(
+            DiagnosticCheck(
+                id="port_bind",
+                status="pass" if (bind_tls_ok and bind_plain_ok) else "fail",
+                params={"port": PORT_BIND, "port_plain": PORT_BIND_PLAIN},
+            )
+        )
 
 
     # --- TLS certificate ---
     # --- TLS certificate ---
     # When running, the cert chain must exist on disk for the slicer's TLS
     # When running, the cert chain must exist on disk for the slicer's TLS

+ 102 - 43
backend/app/services/virtual_printer/ftp_server.py

@@ -8,6 +8,7 @@ immediately upon connection, before any FTP commands are exchanged.
 """
 """
 
 
 import asyncio
 import asyncio
+import hmac
 import logging
 import logging
 import os
 import os
 import random
 import random
@@ -24,6 +25,14 @@ logger = logging.getLogger(__name__)
 # Requires CAP_NET_BIND_SERVICE or root.
 # Requires CAP_NET_BIND_SERVICE or root.
 FTP_PORT = 990
 FTP_PORT = 990
 
 
+# Hard cap on a single upload. 4 GiB covers the largest realistic
+# multi-plate .gcode.3mf and rejects runaway / malicious clients before
+# they can exhaust the disk or OOM the host. STOR still buffers the
+# whole file in memory before write_bytes — peak RSS ~2x file size during
+# the b''.join — so the cap also caps that peak. If real users hit it
+# with a legitimate file, raise here.
+MAX_UPLOAD_BYTES = 4 * 1024 * 1024 * 1024  # 4 GiB
+
 
 
 class FTPSession:
 class FTPSession:
     """Handles a single FTP client session."""
     """Handles a single FTP client session."""
@@ -162,7 +171,10 @@ class FTPSession:
     async def cmd_PASS(self, arg: str) -> None:
     async def cmd_PASS(self, arg: str) -> None:
         """Handle PASS command."""
         """Handle PASS command."""
         if self.username and self.username.lower() == "bblp":
         if self.username and self.username.lower() == "bblp":
-            if arg == self.access_code:
+            # ``hmac.compare_digest`` is constant-time — keeps the auth check
+            # from leaking the access code via response timing under network
+            # jitter. LAN-only threat is marginal; this is the standard fix.
+            if hmac.compare_digest(arg, self.access_code):
                 self.authenticated = True
                 self.authenticated = True
                 await self.send(230, "Login successful")
                 await self.send(230, "Login successful")
                 logger.info("%sFTP login from %s", self._log_prefix, self.remote_ip)
                 logger.info("%sFTP login from %s", self._log_prefix, self.remote_ip)
@@ -380,7 +392,19 @@ class FTPSession:
             await asyncio.sleep(0.1)
             await asyncio.sleep(0.1)
 
 
     async def cmd_STOR(self, arg: str) -> None:
     async def cmd_STOR(self, arg: str) -> None:
-        """Handle STOR command - receive file upload."""
+        """Handle STOR command - receive file upload.
+
+        Streams each chunk directly to disk inside the receive loop instead
+        of buffering the whole file in a ``list[bytes]`` and joining at the
+        end. Wire protocol unchanged — same 150/226/426 sequence, same
+        single-write target path (no ``.part`` or atomic rename), no new
+        verbs, no concurrency guard. The visible behaviour difference is
+        that the destination file grows progressively during upload rather
+        than appearing all-at-once on completion; slicers don't LIST during
+        STOR, so this isn't observable. Peak RSS for a multi-GB upload
+        drops from ~2× file size to one chunk (64 KiB).
+        ``MAX_UPLOAD_BYTES`` cap kept — purely server-internal DoS guard.
+        """
         if not self.authenticated:
         if not self.authenticated:
             await self.send(530, "Not logged in")
             await self.send(530, "Not logged in")
             return
             return
@@ -410,22 +434,23 @@ class FTPSession:
             await self._close_data_connection()
             await self._close_data_connection()
             return
             return
 
 
-        # Receive data
-        data_content: list[bytes] = []
+        # Receive + stream to disk
         total_received = 0
         total_received = 0
+        write_failed: Exception | None = None
         try:
         try:
-            while True:
-                chunk = await asyncio.wait_for(self._data_reader.read(65536), timeout=60)
-                if not chunk:
-                    break
-                data_content.append(chunk)
-                total_received += len(chunk)
-                logger.debug("FTP received chunk: %s bytes (total: %s)", len(chunk), total_received)
+            with file_path.open("wb") as f:
+                while True:
+                    chunk = await asyncio.wait_for(self._data_reader.read(65536), timeout=60)
+                    if not chunk:
+                        break
+                    total_received += len(chunk)
+                    if total_received > MAX_UPLOAD_BYTES:
+                        raise OSError(f"upload exceeded size cap ({total_received} > {MAX_UPLOAD_BYTES} bytes)")
+                    f.write(chunk)
+                    logger.debug("FTP received chunk: %s bytes (total: %s)", len(chunk), total_received)
         except TimeoutError:
         except TimeoutError:
             logger.error("FTP data transfer timeout after %s bytes for %s", total_received, filename)
             logger.error("FTP data transfer timeout after %s bytes for %s", total_received, filename)
-            await self.send(426, "Transfer timeout")
-            await self._close_data_connection()
-            return
+            write_failed = TimeoutError("Transfer timeout")
         except Exception as e:
         except Exception as e:
             logger.error(
             logger.error(
                 "FTP data transfer error after %s bytes for %s: %s(%s)",
                 "FTP data transfer error after %s bytes for %s: %s(%s)",
@@ -434,32 +459,32 @@ class FTPSession:
                 type(e).__name__,
                 type(e).__name__,
                 e,
                 e,
             )
             )
-            await self.send(426, f"Transfer failed: {e}")
-            await self._close_data_connection()
-            return
+            write_failed = e
 
 
         # Close data connection
         # Close data connection
         await self._close_data_connection()
         await self._close_data_connection()
 
 
-        # Write file
-        try:
-            total_size = sum(len(c) for c in data_content)
-            file_path.write_bytes(b"".join(data_content))
-            logger.info("FTP saved file: %s (%s bytes)", file_path, total_size)
-            await self.send(226, "Transfer complete")
+        if write_failed is not None:
+            # Drop the partial file so it doesn't masquerade as a complete
+            # upload — buffer-then-write never had a partial-file footprint.
+            try:
+                file_path.unlink(missing_ok=True)
+            except OSError:
+                pass
+            await self.send(426, f"Transfer failed: {write_failed}")
+            return
 
 
-            # Notify callback
-            if self.on_file_received:
-                try:
-                    result = self.on_file_received(file_path, self.remote_ip)
-                    if asyncio.iscoroutine(result):
-                        await result
-                except Exception as e:
-                    logger.error("File received callback error: %s", e)
+        # Confirm + notify
+        logger.info("FTP saved file: %s (%s bytes)", file_path, total_received)
+        await self.send(226, "Transfer complete")
 
 
-        except Exception as e:
-            logger.error("Failed to save file %s: %s", file_path, e)
-            await self.send(550, "Failed to save file")
+        if self.on_file_received:
+            try:
+                result = self.on_file_received(file_path, self.remote_ip)
+                if asyncio.iscoroutine(result):
+                    await result
+            except Exception as e:
+                logger.error("File received callback error: %s", e)
 
 
     async def cmd_SIZE(self, arg: str) -> None:
     async def cmd_SIZE(self, arg: str) -> None:
         """Handle SIZE command."""
         """Handle SIZE command."""
@@ -514,7 +539,18 @@ class FTPSession:
         await self.send(257, f'"{arg}" directory created')
         await self.send(257, f'"{arg}" directory created')
 
 
     async def cmd_LIST(self, arg: str) -> None:
     async def cmd_LIST(self, arg: str) -> None:
-        """Handle LIST command - list directory contents."""
+        """Handle LIST command - list directory contents.
+
+        Intentionally answers 150 + 226 without opening the passive data
+        channel. Bambuddy is an upload-only VP — no slicer in capture logs
+        actually issues LIST during the project_file flow, so the
+        no-data-conn ack is what every observed slicer accepts. A previous
+        audit recommended opening + closing the data conn for protocol
+        purity; reverted because (a) the bug was theoretical, (b) slicer
+        compatibility matters more than RFC purity here, and (c) adding
+        NLST/MLSD alongside changes the "supported verbs" surface in a way
+        we cannot regression-test without every supported slicer build.
+        """
         if not self.authenticated:
         if not self.authenticated:
             await self.send(530, "Not logged in")
             await self.send(530, "Not logged in")
             return
             return
@@ -526,8 +562,14 @@ class FTPSession:
 class VirtualPrinterFTPServer:
 class VirtualPrinterFTPServer:
     """Implicit FTPS server that accepts uploads from slicers."""
     """Implicit FTPS server that accepts uploads from slicers."""
 
 
+    # Passive-mode data port range. Widened from 50000-50100 (101 ports) to
+    # 50000-51000 (1001 ports) so concurrent transfers across multiple VPs
+    # — particularly when a VP falls back to bind 0.0.0.0 (manager.py picks
+    # this when bind_ip is unset) — don't collide. With 101 ports and 10
+    # random pick attempts per session, birthday-style collisions hit
+    # under load; 1001 ports gives multi-VP setups headroom.
     PASSIVE_PORT_MIN = 50000
     PASSIVE_PORT_MIN = 50000
-    PASSIVE_PORT_MAX = 50100
+    PASSIVE_PORT_MAX = 51000
 
 
     def __init__(
     def __init__(
         self,
         self,
@@ -562,6 +604,12 @@ class VirtualPrinterFTPServer:
         self.vp_name = vp_name
         self.vp_name = vp_name
         self._server: asyncio.Server | None = None
         self._server: asyncio.Server | None = None
         self._running = False
         self._running = False
+        # Set after the socket is bound and the server is accepting connections,
+        # so VirtualPrinterInstance.start_server can wait for readiness before
+        # reporting is_running=True. Without this, a caller racing the start
+        # could probe the port and see "connection refused" while is_running
+        # already says yes.
+        self.ready = asyncio.Event()
         self._ssl_context: ssl.SSLContext | None = None
         self._ssl_context: ssl.SSLContext | None = None
         self._active_sessions: list[asyncio.Task] = []
         self._active_sessions: list[asyncio.Task] = []
         # Override PASV response IP for Docker bridge mode / NAT environments
         # Override PASV response IP for Docker bridge mode / NAT environments
@@ -579,7 +627,14 @@ class VirtualPrinterFTPServer:
         cache_dir = self.upload_dir / "cache"
         cache_dir = self.upload_dir / "cache"
         cache_dir.mkdir(exist_ok=True)
         cache_dir.mkdir(exist_ok=True)
 
 
-        # Create SSL context for implicit FTPS (TLS from byte 0)
+        # Create SSL context for implicit FTPS (TLS from byte 0).
+        # Pinned to TLS 1.2 only. Allowing 1.3 broke BambuStudio mid-upload
+        # in the field (session_reused=True on data channel via PSK + libcurl
+        # CURLE_PARTIAL_FILE / RST after ~80 KiB; "server did not report OK,
+        # got 426"). Real Bambu printers also serve their FTPS at 1.2 only,
+        # and the slicer expects to match that. A future slicer drop of 1.2
+        # is a problem to solve when it actually happens; until then 1.2 is
+        # mandatory for compat.
         self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
         self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
         self._ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
         self._ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
         self._ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
         self._ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
@@ -599,6 +654,7 @@ class VirtualPrinterFTPServer:
                 ssl=self._ssl_context,  # This makes it implicit FTPS!
                 ssl=self._ssl_context,  # This makes it implicit FTPS!
             )
             )
             self._running = True
             self._running = True
+            self.ready.set()
 
 
             logger.info("Implicit FTPS server started on port %s", self.port)
             logger.info("Implicit FTPS server started on port %s", self.port)
             logger.info(
             logger.info(
@@ -657,14 +713,17 @@ class VirtualPrinterFTPServer:
         """Stop the FTPS server."""
         """Stop the FTPS server."""
         logger.info("Stopping FTP server")
         logger.info("Stopping FTP server")
         self._running = False
         self._running = False
-
-        # Cancel all active sessions first
-        for task in self._active_sessions[:]:  # Copy list to avoid modification during iteration
+        self.ready.clear()
+
+        # Cancel all active sessions and AWAIT cancellation. Previously
+        # this slept 0.1 s and called it good — a session mid-write,
+        # mid-TLS handshake, or holding a 60 s data-read could easily
+        # outlive that and then ``_server.close()`` would run while the
+        # underlying sockets were still in use.
+        for task in self._active_sessions[:]:
             task.cancel()
             task.cancel()
-
-        # Wait briefly for sessions to clean up
         if self._active_sessions:
         if self._active_sessions:
-            await asyncio.sleep(0.1)
+            await asyncio.gather(*self._active_sessions, return_exceptions=True)
 
 
         self._active_sessions.clear()
         self._active_sessions.clear()
 
 

+ 171 - 16
backend/app/services/virtual_printer/manager.py

@@ -47,6 +47,8 @@ VIRTUAL_PRINTER_MODELS = {
     "N1": "A1 Mini",  # A1 Mini
     "N1": "A1 Mini",  # A1 Mini
     # H2 Series
     # H2 Series
     "O1D": "H2D",  # H2D
     "O1D": "H2D",  # H2D
+    "O1E": "H2D Pro",  # H2D Pro
+    "O2D": "H2D Pro",  # H2D Pro
     "O1C": "H2C",  # H2C
     "O1C": "H2C",  # H2C
     "O1C2": "H2C",  # H2C (dual nozzle variant)
     "O1C2": "H2C",  # H2C (dual nozzle variant)
     "O1S": "H2S",  # H2S
     "O1S": "H2S",  # H2S
@@ -77,6 +79,8 @@ MODEL_SERIAL_PREFIXES = {
     "N1": "03000A",  # A1 Mini
     "N1": "03000A",  # A1 Mini
     # H2 Series
     # H2 Series
     "O1D": "09400A",  # H2D
     "O1D": "09400A",  # H2D
+    "O1E": "09400A",  # H2D Pro (same prefix family as H2D)
+    "O2D": "09400A",  # H2D Pro
     "O1C": "09400A",  # H2C
     "O1C": "09400A",  # H2C
     "O1C2": "09400A",  # H2C (dual nozzle variant)
     "O1C2": "09400A",  # H2C (dual nozzle variant)
     "O1S": "09400A",  # H2S
     "O1S": "09400A",  # H2S
@@ -88,6 +92,14 @@ DISPLAY_NAME_TO_MODEL_CODE = {v: k for k, v in VIRTUAL_PRINTER_MODELS.items()}
 # Default model
 # Default model
 DEFAULT_VIRTUAL_PRINTER_MODEL = "BL-P001"  # X1C
 DEFAULT_VIRTUAL_PRINTER_MODEL = "BL-P001"  # X1C
 
 
+# Bound on per-instance ``_slicer_print_options`` cache size. The slicer's
+# project_file MQTT command stashes one dict per filename; the
+# corresponding ``_add_to_print_queue`` pop only fires when the file
+# upload completes. Failed / cancelled / non-3MF uploads orphan their
+# stash. The bound triggers FIFO eviction in ``on_print_command`` once
+# the dict fills, so a long-running VP can't leak unbounded state.
+_SLICER_OPTIONS_CACHE_LIMIT = 128
+
 
 
 def _get_serial_for_model(model: str, serial_suffix: str) -> str:
 def _get_serial_for_model(model: str, serial_suffix: str) -> str:
     """Get serial number for the given model and suffix."""
     """Get serial number for the given model and suffix."""
@@ -241,6 +253,12 @@ class VirtualPrinterInstance:
         # against the same field during the upload window.
         # against the same field during the upload window.
         if self._mqtt and file_path.suffix.lower() == ".3mf":
         if self._mqtt and file_path.suffix.lower() == ".3mf":
             self._mqtt.set_gcode_state("FINISH", filename=file_path.name, prepare_percent="100")
             self._mqtt.set_gcode_state("FINISH", filename=file_path.name, prepare_percent="100")
+            # FINISH is the terminal state for the upload cycle per #1280
+            # (commit 0d6171dc). The Print-flow slicer's in-flight-job lock
+            # releases on FINISH; resetting to IDLE 2 s later would re-confuse
+            # the slicer that just unwedged. Earlier audit suggesting the
+            # IDLE reset was wrong — staying at FINISH is the designed
+            # behaviour. The next upload's PREPARE→FINISH cycle starts fresh.
 
 
     async def on_print_command(self, filename: str, data: dict) -> None:
     async def on_print_command(self, filename: str, data: dict) -> None:
         """Handle print command from MQTT.
         """Handle print command from MQTT.
@@ -257,6 +275,19 @@ class VirtualPrinterInstance:
         logger.info("[VP %s] Print command for: %s", self.name, filename)
         logger.info("[VP %s] Print command for: %s", self.name, filename)
         if self.mode != "print_queue":
         if self.mode != "print_queue":
             return
             return
+        # Drop the oldest stash if the cache is growing — happens when the
+        # slicer sends project_file for a filename whose FTP upload was
+        # rejected / cancelled / non-3MF, so _add_to_print_queue's pop
+        # never fires. With no bound, a long-running VP accumulates one
+        # dict per such mismatch.
+        if len(self._slicer_print_options) >= _SLICER_OPTIONS_CACHE_LIMIT:
+            try:
+                stale_key = next(iter(self._slicer_print_options))
+                self._slicer_print_options.pop(stale_key, None)
+                self._slicer_print_options_events.pop(stale_key, None)
+                logger.debug("[VP %s] Evicted stale slicer options for %s", self.name, stale_key)
+            except StopIteration:
+                pass
         self._slicer_print_options[filename] = dict(data)
         self._slicer_print_options[filename] = dict(data)
         event = self._slicer_print_options_events.get(filename)
         event = self._slicer_print_options_events.get(filename)
         if event:
         if event:
@@ -277,6 +308,7 @@ class VirtualPrinterInstance:
                 pass
                 pass
             return
             return
 
 
+        archived = False
         try:
         try:
             from backend.app.api.routes.settings import get_setting
             from backend.app.api.routes.settings import get_setting
             from backend.app.services.archive import ArchiveService
             from backend.app.services.archive import ArchiveService
@@ -298,15 +330,29 @@ class VirtualPrinterInstance:
                 if archive:
                 if archive:
                     logger.info("[VP %s] Archived: %s - %s", self.name, archive.id, archive.print_name)
                     logger.info("[VP %s] Archived: %s - %s", self.name, archive.id, archive.print_name)
                     await self._broadcast_archive_created(archive)
                     await self._broadcast_archive_created(archive)
-                    try:
-                        file_path.unlink()
-                    except OSError:
-                        pass
-                    self._pending_files.pop(file_path.name, None)
+                    archived = True
                 else:
                 else:
                     logger.error("Failed to archive file: %s", file_path.name)
                     logger.error("Failed to archive file: %s", file_path.name)
         except Exception as e:
         except Exception as e:
             logger.error("Error archiving file: %s", e)
             logger.error("Error archiving file: %s", e)
+        finally:
+            # Always release the in-flight marker and delete the temp file —
+            # previously the failure paths only logged and the next upload of
+            # the same name was silently rejected with "already uploading",
+            # the upload_dir filled up indefinitely, and the slicer received
+            # a clean 226 even though no archive existed (#audit-R2-1).
+            self._pending_files.pop(file_path.name, None)
+            if archived:
+                try:
+                    file_path.unlink()
+                except OSError:
+                    pass
+            else:
+                # Drop the failed temp file so it doesn't accumulate.
+                try:
+                    file_path.unlink(missing_ok=True)
+                except OSError:
+                    pass
 
 
     async def _queue_file(self, file_path: Path, source_ip: str) -> None:
     async def _queue_file(self, file_path: Path, source_ip: str) -> None:
         """Queue file for user review."""
         """Queue file for user review."""
@@ -354,9 +400,19 @@ class VirtualPrinterInstance:
                 db.add(pending)
                 db.add(pending)
                 await db.commit()
                 await db.commit()
                 logger.info("[VP %s] Queued: %s - %s", self.name, pending.id, file_path.name)
                 logger.info("[VP %s] Queued: %s - %s", self.name, pending.id, file_path.name)
-                self._pending_files.pop(file_path.name, None)
         except Exception as e:
         except Exception as e:
             logger.error("Error queueing file: %s", e)
             logger.error("Error queueing file: %s", e)
+            # Queue insert failed — drop the temp file so it doesn't
+            # accumulate. The file is unreachable without the DB row.
+            try:
+                file_path.unlink(missing_ok=True)
+            except OSError:
+                pass
+        finally:
+            # Always release the in-flight marker so concurrent uploads
+            # with the same filename aren't spuriously rejected after
+            # a queue failure.
+            self._pending_files.pop(file_path.name, None)
 
 
     async def _add_to_print_queue(self, file_path: Path, source_ip: str) -> None:
     async def _add_to_print_queue(self, file_path: Path, source_ip: str) -> None:
         """Archive file and add to print queue, assigned to target printer or model."""
         """Archive file and add to print queue, assigned to target printer or model."""
@@ -492,12 +548,33 @@ class VirtualPrinterInstance:
                             if overrides:
                             if overrides:
                                 filament_overrides_json = json.dumps(overrides)
                                 filament_overrides_json = json.dumps(overrides)
 
 
+                    # Pick the next free position the same way the manual
+                    # /print-queue/ POST does — previously hardcoded to 1,
+                    # which created duplicate position=1 rows on every
+                    # VP upload and made queue execution order
+                    # non-deterministic for any non-empty queue.
+                    from sqlalchemy import func, select as _sql_select
+
+                    queue_scope = _sql_select(func.max(PrintQueueItem.position)).where(
+                        PrintQueueItem.status == "pending"
+                    )
+                    if self.target_printer_id is not None:
+                        queue_scope = queue_scope.where(PrintQueueItem.printer_id == self.target_printer_id)
+                    else:
+                        queue_scope = queue_scope.where(PrintQueueItem.printer_id.is_(None))
+                    try:
+                        max_pos_raw = (await db.execute(queue_scope)).scalar()
+                        max_pos = int(max_pos_raw) if max_pos_raw is not None else 0
+                    except (TypeError, ValueError):
+                        max_pos = 0
+                    next_position = max_pos + 1
+
                     queue_item = PrintQueueItem(
                     queue_item = PrintQueueItem(
                         printer_id=self.target_printer_id,
                         printer_id=self.target_printer_id,
                         target_model=target_model,
                         target_model=target_model,
                         archive_id=archive.id,
                         archive_id=archive.id,
                         plate_id=plate_id,
                         plate_id=plate_id,
-                        position=1,
+                        position=next_position,
                         status="pending",
                         status="pending",
                         manual_start=not self.auto_dispatch,
                         manual_start=not self.auto_dispatch,
                         required_filament_types=required_filament_types_json,
                         required_filament_types=required_filament_types_json,
@@ -512,15 +589,20 @@ class VirtualPrinterInstance:
                     await db.commit()
                     await db.commit()
                     logger.info("[VP %s] Added to queue: %s", self.name, queue_item.id)
                     logger.info("[VP %s] Added to queue: %s", self.name, queue_item.id)
                     await self._broadcast_archive_created(archive)
                     await self._broadcast_archive_created(archive)
-                    try:
-                        file_path.unlink()
-                    except OSError:
-                        pass
-                    self._pending_files.pop(file_path.name, None)
                 else:
                 else:
                     logger.error("Failed to archive file: %s", file_path.name)
                     logger.error("Failed to archive file: %s", file_path.name)
         except Exception as e:
         except Exception as e:
             logger.error("Error adding to print queue: %s", e)
             logger.error("Error adding to print queue: %s", e)
+        finally:
+            # Always release the marker and clean the temp file. Without this
+            # the same-name STOR guard would block the next upload and the
+            # upload_dir would accumulate failed temp files forever
+            # (#audit-R2-1).
+            self._pending_files.pop(file_path.name, None)
+            try:
+                file_path.unlink(missing_ok=True)
+            except OSError:
+                pass
 
 
     async def _broadcast_archive_created(self, archive) -> None:
     async def _broadcast_archive_created(self, archive) -> None:
         """Notify connected clients that a new archive exists.
         """Notify connected clients that a new archive exists.
@@ -560,7 +642,12 @@ class VirtualPrinterInstance:
                         for meta in plate.findall("metadata"):
                         for meta in plate.findall("metadata"):
                             if meta.get("key") == "index" and meta.get("value"):
                             if meta.get("key") == "index" and meta.get("value"):
                                 return int(meta.get("value"))
                                 return int(meta.get("value"))
-        except Exception:
+        except Exception as e:
+            # Malformed / missing slice_info.config — fall through to None.
+            # Logged at debug so a non-3MF or unconventional 3MF doesn't
+            # spam production logs; a debug trail exists for support
+            # bundles when wrong-plate dispatches are reported.
+            logger.debug("[VP] _extract_plate_id failed for %s: %s", file_path.name, e)
             return None
             return None
         return None
         return None
 
 
@@ -682,8 +769,11 @@ class VirtualPrinterInstance:
             )
             )
         )
         )
 
 
-        # SSDP server — advertise_addr is the Tailscale FQDN when available,
-        # otherwise the bind/remote IP (existing behaviour)
+        # SSDP server — advertise_addr is the remote_interface_ip (Tailscale
+        # IP, when chosen from the bind_ip dropdown) or the bind_ip. SSDP
+        # Location accepts IPs only; FQDNs go in through bind_ip selection
+        # at the printer-IP level and resolve before reaching the SSDP
+        # advertisement.
         self._ssdp = VirtualPrinterSSDPServer(
         self._ssdp = VirtualPrinterSSDPServer(
             name=self.name,
             name=self.name,
             serial=self.serial,
             serial=self.serial,
@@ -698,6 +788,32 @@ class VirtualPrinterInstance:
             )
             )
         )
         )
 
 
+        # Wait briefly for every child service to actually finish binding its
+        # socket so ``is_running`` doesn't lie. Without this barrier a caller
+        # racing the start (e.g. the diagnostic route) would see is_running=True
+        # while ports were still in the gap between task creation and the
+        # ``asyncio.start_server`` returning. Bounded timeout — if a child
+        # hangs we log it and move on; the existing task tracking still
+        # catches the failure on the next iteration.
+        ready_targets = [
+            ("FTP", self._ftp.ready),
+            ("MQTT", self._mqtt.ready),
+            ("Bind", self._bind.ready),
+            ("SSDP", self._ssdp.ready),
+        ]
+        try:
+            await asyncio.wait_for(
+                asyncio.gather(*(e.wait() for _, e in ready_targets)),
+                timeout=5.0,
+            )
+        except TimeoutError:
+            not_ready = [name for name, e in ready_targets if not e.is_set()]
+            logger.warning(
+                "[VP %s] Sub-service(s) didn't bind within 5s: %s — continuing anyway",
+                self.name,
+                ", ".join(not_ready) or "(none)",
+            )
+
         logger.info("[VP %s] Server-mode services started on %s", self.name, bind_addr)
         logger.info("[VP %s] Server-mode services started on %s", self.name, bind_addr)
 
 
     async def stop_server(self) -> None:
     async def stop_server(self) -> None:
@@ -850,6 +966,13 @@ class VirtualPrinterManager:
         self._session_factory: Callable | None = None
         self._session_factory: Callable | None = None
         self._printer_manager: PrinterManager | None = None
         self._printer_manager: PrinterManager | None = None
         self._instances: dict[int, VirtualPrinterInstance] = {}
         self._instances: dict[int, VirtualPrinterInstance] = {}
+        # Serialize sync_from_db so concurrent PUT /vp/{id} calls can't
+        # race the start/stop sequence and leave duplicate sub-services
+        # bound to the same port. The lock is fine-grained enough that
+        # a single VP update completes in well under a second; if the
+        # user holds the lock with a long-running start they intended
+        # to anyway.
+        self._sync_lock = asyncio.Lock()
 
 
         # Directories
         # Directories
         self._base_dir = app_settings.base_dir / "virtual_printer"
         self._base_dir = app_settings.base_dir / "virtual_printer"
@@ -895,11 +1018,22 @@ class VirtualPrinterManager:
         return len(self._instances) > 0
         return len(self._instances) > 0
 
 
     async def sync_from_db(self) -> None:
     async def sync_from_db(self) -> None:
-        """Load all VPs from DB, reconcile running state."""
+        """Load all VPs from DB, reconcile running state.
+
+        Serialised by ``self._sync_lock`` — concurrent PUT /vp/{id} routes
+        all call into this method; without the lock the start / stop
+        sequence races and can leave duplicate sub-services bound to the
+        same port or orphan still-running tasks.
+        """
         if not self._session_factory:
         if not self._session_factory:
             logger.warning("Cannot sync virtual printers: no session factory")
             logger.warning("Cannot sync virtual printers: no session factory")
             return
             return
 
 
+        async with self._sync_lock:
+            await self._sync_from_db_locked()
+
+    async def _sync_from_db_locked(self) -> None:
+        """Inner sync body — caller holds ``self._sync_lock``."""
         from sqlalchemy import select
         from sqlalchemy import select
 
 
         from backend.app.models.printer import Printer
         from backend.app.models.printer import Printer
@@ -935,6 +1069,22 @@ class VirtualPrinterManager:
             if not instance:
             if not instance:
                 continue
                 continue
 
 
+            # Proxy mode: detect target printer IP / serial changes from the
+            # DB lookup above. Without this branch a DHCP renewal that gives
+            # the target printer a new IP would leave the running proxy
+            # forwarding to the stale IP until the user manually toggles the
+            # VP. The same shape covers a target-side serial change.
+            proxy_target_changed = False
+            if vp.mode == "proxy":
+                fresh = proxy_ips.get(vp.id)
+                if fresh is not None:
+                    fresh_ip, fresh_serial = fresh
+                    if (
+                        getattr(instance, "target_printer_ip", None) != fresh_ip
+                        or getattr(instance, "target_printer_serial", None) != fresh_serial
+                    ):
+                        proxy_target_changed = True
+
             changed = (
             changed = (
                 instance.mode != vp.mode
                 instance.mode != vp.mode
                 or instance.model != (vp.model or DEFAULT_VIRTUAL_PRINTER_MODEL)
                 or instance.model != (vp.model or DEFAULT_VIRTUAL_PRINTER_MODEL)
@@ -943,6 +1093,11 @@ class VirtualPrinterManager:
                 or instance.remote_interface_ip != (vp.remote_interface_ip or "")
                 or instance.remote_interface_ip != (vp.remote_interface_ip or "")
                 or instance.target_printer_id != vp.target_printer_id
                 or instance.target_printer_id != vp.target_printer_id
                 or instance.auto_dispatch != vp.auto_dispatch
                 or instance.auto_dispatch != vp.auto_dispatch
+                # Queue-mode behaviour toggle — without it the running
+                # instance silently keeps the old value until process
+                # restart (#1552 follow-up family).
+                or instance.queue_force_color_match != vp.queue_force_color_match
+                or proxy_target_changed
             )
             )
 
 
             if changed:
             if changed:

+ 27 - 1
backend/app/services/virtual_printer/mqtt_bridge.py

@@ -65,6 +65,20 @@ _SLICER_VISIBLE_STICKY_KEYS: tuple[str, ...] = (
     "net",
     "net",
     "ipcam",
     "ipcam",
     "lights_report",
     "lights_report",
+    # Pre-flight / Prepare-tab fields that BambuStudio reads off cached
+    # push_status. Bambu firmware emits them in full pushall but typically
+    # OMITS them from 1 Hz incremental updates, so without sticky-preservation
+    # the cache drops them after the very next tick and the slicer's
+    # "block Send while busy / unknown firmware" branch kicks in. Same shape
+    # as #1228 (storage indicators) and #1558 (live-progress fields) —
+    # cached-branch field-shape parity, not a new mechanism.
+    "upgrade_state",  # Send pre-flight reads dis_state / force_upgrade
+    "xcam",  # Prepare-tab reads spaghetti / first-layer / halt sensitivity
+    "hw_switch_state",  # Hardware switch state (Prepare tab)
+    "nozzle_diameter",
+    "nozzle_type",
+    "online",  # Module online map (ahb / rfid / version)
+    "ams_status",  # AMS overall status; can be ams_status-only incremental
 )
 )
 
 
 
 
@@ -241,6 +255,11 @@ class MQTTBridge:
         BambuMQTTClient is destroyed and recreated on PrinterManager.connect_printer
         BambuMQTTClient is destroyed and recreated on PrinterManager.connect_printer
         (e.g. printer config update). Without periodic refresh the bridge would lose
         (e.g. printer config update). Without periodic refresh the bridge would lose
         fan-out after such a churn until the VP itself restarts.
         fan-out after such a churn until the VP itself restarts.
+
+        On crash exit, the handler must be unbound — otherwise the registered
+        ``_on_printer_raw`` keeps firing on every real-printer message even
+        though the bridge is functionally dead (memory leak + behaviour leak
+        across VP restart).
         """
         """
         try:
         try:
             while not self._stopping:
             while not self._stopping:
@@ -250,6 +269,9 @@ class MQTTBridge:
             raise
             raise
         except Exception:
         except Exception:
             logger.exception("[%s] MQTT bridge refresh loop crashed", self.vp_name)
             logger.exception("[%s] MQTT bridge refresh loop crashed", self.vp_name)
+            # Crash exit — unbind so the orphaned handler stops firing.
+            # ``stop()`` won't be invoked because the task completes done-not-cancelled.
+            self._unbind_client()
 
 
     def _resolve_client(self) -> None:
     def _resolve_client(self) -> None:
         """Look up the current client for target_printer_id and rebind if it changed."""
         """Look up the current client for target_printer_id and rebind if it changed."""
@@ -402,7 +424,11 @@ class MQTTBridge:
                 for sticky_key in _SLICER_VISIBLE_STICKY_KEYS:
                 for sticky_key in _SLICER_VISIBLE_STICKY_KEYS:
                     if sticky_key not in new_state:
                     if sticky_key not in new_state:
                         if sticky_key in prev:
                         if sticky_key in prev:
-                            new_state[sticky_key] = prev[sticky_key]
+                            # Defensive deep copy — without this the carried-over
+                            # nested dicts/lists are shared between new_state and
+                            # the previous cache, so any in-place mutation later
+                            # (current or future code paths) would corrupt both.
+                            new_state[sticky_key] = copy.deepcopy(prev[sticky_key])
                         continue
                         continue
                     # Key IS in new_state — but firmware sends partial blobs
                     # Key IS in new_state — but firmware sends partial blobs
                     # (status-only / tray-targeted) under the same key on
                     # (status-only / tray-targeted) under the same key on

+ 173 - 5
backend/app/services/virtual_printer/mqtt_server.py

@@ -5,6 +5,8 @@ authenticates with the configured access code, and logs print commands.
 """
 """
 
 
 import asyncio
 import asyncio
+import copy
+import hmac
 import json
 import json
 import logging
 import logging
 import ssl
 import ssl
@@ -20,6 +22,22 @@ logger = logging.getLogger(__name__)
 # Default MQTT port for Bambu printers (MQTT over TLS)
 # Default MQTT port for Bambu printers (MQTT over TLS)
 MQTT_PORT = 8883
 MQTT_PORT = 8883
 
 
+# Per-IP MQTT auth rate-limit. 5 failures within 60 s blocks further attempts
+# for the remainder of the window. Bambu printers themselves don't rate-limit,
+# but they're not exposed past the LAN edge; Bambuddy's VPs sometimes are
+# (Tailscale, port-forwarded), so an 8-char access code without any
+# brute-force friction is too weak. The window auto-recovers — no manual
+# unblock — so a legitimate user who fat-fingered their access code 5 times
+# only waits up to 60 s.
+_AUTH_RATE_LIMIT_MAX_ATTEMPTS = 5
+_AUTH_RATE_LIMIT_WINDOW_SECONDS = 60.0
+
+# Pending-request map bound. Each entry maps a slicer command's
+# sequence_id to its originating client_id so the bridge response can be
+# routed back to just that client. Bounded so a slicer that issues
+# commands without ever consuming responses can't leak memory.
+_PENDING_REQUEST_MAX_ENTRIES = 256
+
 # Model code → product_name for version response (must match what slicer expects)
 # Model code → product_name for version response (must match what slicer expects)
 MODEL_PRODUCT_NAMES = {
 MODEL_PRODUCT_NAMES = {
     "BL-P001": "X1 Carbon",
     "BL-P001": "X1 Carbon",
@@ -204,6 +222,8 @@ class SimpleMQTTServer:
         self.vp_name = vp_name
         self.vp_name = vp_name
         self._log_prefix = f"[{vp_name}] " if vp_name else ""
         self._log_prefix = f"[{vp_name}] " if vp_name else ""
         self._running = False
         self._running = False
+        # Set after the socket is bound — see ftp_server.py for rationale.
+        self.ready = asyncio.Event()
         self._server = None
         self._server = None
         self._clients: dict[str, asyncio.StreamWriter] = {}
         self._clients: dict[str, asyncio.StreamWriter] = {}
         # Per-client "effective serial" — the serial the slicer actually uses in
         # Per-client "effective serial" — the serial the slicer actually uses in
@@ -228,6 +248,20 @@ class SimpleMQTTServer:
         # synthetic fallback resumes automatically.
         # synthetic fallback resumes automatically.
         self._bridge: MQTTBridge | None = None
         self._bridge: MQTTBridge | None = None
 
 
+        # Per-source-IP failed-auth tracker for rate-limiting / lockout.
+        # Maps IP → list[monotonic timestamp] of recent failures within the
+        # window. Pruned on every check so it doesn't grow unbounded.
+        self._auth_failures: dict[str, list[float]] = {}
+
+        # Maps sequence_id → originating client_id for slicer-initiated
+        # commands forwarded to the real printer. Used in
+        # ``push_raw_to_clients`` to route the printer's response only
+        # back to the requesting slicer instead of fanning out to all
+        # connected clients (which leaks slicer A's responses to slicer
+        # B in multi-slicer setups). FIFO-bounded; if a response never
+        # arrives the entry ages out instead of leaking.
+        self._pending_requests: dict[str, str] = {}
+
     async def start(self) -> None:
     async def start(self) -> None:
         """Start the MQTT server."""
         """Start the MQTT server."""
         if self._running:
         if self._running:
@@ -287,6 +321,7 @@ class SimpleMQTTServer:
                 self.port,
                 self.port,
                 ssl=ssl_context,
                 ssl=ssl_context,
             )
             )
+            self.ready.set()
 
 
             logger.info("Simple MQTT server listening on port %s", self.port)
             logger.info("Simple MQTT server listening on port %s", self.port)
 
 
@@ -312,6 +347,7 @@ class SimpleMQTTServer:
         """Stop the MQTT server."""
         """Stop the MQTT server."""
         logger.info("Stopping simple MQTT server")
         logger.info("Stopping simple MQTT server")
         self._running = False
         self._running = False
+        self.ready.clear()
 
 
         # Stop periodic status push
         # Stop periodic status push
         if self._status_push_task:
         if self._status_push_task:
@@ -392,10 +428,17 @@ class SimpleMQTTServer:
         logger.info("Periodic status push task stopped")
         logger.info("Periodic status push task stopped")
 
 
     async def push_raw_to_clients(self, topic: str, payload: bytes) -> None:
     async def push_raw_to_clients(self, topic: str, payload: bytes) -> None:
-        """Publish a pre-serialized MQTT payload on `topic` to every connected slicer.
+        """Publish a pre-serialized MQTT payload on `topic` to connected slicers.
 
 
         Called by MQTTBridge from the asyncio loop (scheduled via
         Called by MQTTBridge from the asyncio loop (scheduled via
         run_coroutine_threadsafe from paho's network thread).
         run_coroutine_threadsafe from paho's network thread).
+
+        Routes the response only back to the originating slicer if the
+        payload's sequence_id was previously recorded via
+        ``_record_pending_request``. Falls back to fan-out for
+        printer-initiated pushes (push_status etc.) and for sequence_ids
+        we never saw (covers a slicer that subscribes mid-flight to a
+        topic for which an earlier request is still in flight).
         """
         """
         topic_bytes = topic.encode("utf-8")
         topic_bytes = topic.encode("utf-8")
         # MQTT remaining-length: 2-byte topic length prefix + topic + message body.
         # MQTT remaining-length: 2-byte topic length prefix + topic + message body.
@@ -414,8 +457,12 @@ class SimpleMQTTServer:
         packet.extend(payload)
         packet.extend(payload)
         frame = bytes(packet)
         frame = bytes(packet)
 
 
+        target_client_id = self._lookup_pending_request_client(payload)
+
         disconnected = []
         disconnected = []
         for client_id, writer in list(self._clients.items()):
         for client_id, writer in list(self._clients.items()):
+            if target_client_id is not None and client_id != target_client_id:
+                continue
             try:
             try:
                 if writer.is_closing():
                 if writer.is_closing():
                     disconnected.append(client_id)
                     disconnected.append(client_id)
@@ -470,9 +517,23 @@ class SimpleMQTTServer:
 
 
                 # Handle packet types
                 # Handle packet types
                 if packet_type == 1:  # CONNECT
                 if packet_type == 1:  # CONNECT
+                    source_ip = addr[0] if addr else "unknown"
+                    if self._is_auth_rate_limited(source_ip):
+                        logger.warning(
+                            "%sMQTT auth rate-limited from %s (>=%d failures in %ds)",
+                            self._log_prefix,
+                            source_ip,
+                            _AUTH_RATE_LIMIT_MAX_ATTEMPTS,
+                            int(_AUTH_RATE_LIMIT_WINDOW_SECONDS),
+                        )
+                        writer.write(bytes([0x20, 0x02, 0x00, 0x05]))  # Not authorized
+                        await writer.drain()
+                        break
                     authenticated, keep_alive = await self._handle_connect(payload, writer)
                     authenticated, keep_alive = await self._handle_connect(payload, writer)
                     if not authenticated:
                     if not authenticated:
+                        self._record_auth_failure(source_ip)
                         break
                         break
+                    self._clear_auth_failures(source_ip)
                     # Honour the client's negotiated keepalive (#1548). Before
                     # Honour the client's negotiated keepalive (#1548). Before
                     # this fix, the hardcoded 60 s above would close
                     # this fix, the hardcoded 60 s above would close
                     # OrcaSlicer's idle connection at the keepalive boundary
                     # OrcaSlicer's idle connection at the keepalive boundary
@@ -501,7 +562,11 @@ class SimpleMQTTServer:
         except asyncio.CancelledError:
         except asyncio.CancelledError:
             pass  # Expected when server is shutting down and cancels client tasks
             pass  # Expected when server is shutting down and cancels client tasks
         except Exception as e:
         except Exception as e:
-            logger.debug("MQTT client error: %s", e)
+            # Outer handler — inner handlers already absorb expected parser
+            # / IO failures at debug. Anything reaching here is unexpected
+            # and would otherwise silently drop the slicer connection with
+            # no actionable signal in production logs (defaults are INFO+).
+            logger.warning("%sMQTT client session error from %s: %s", self._log_prefix, client_id, e)
         finally:
         finally:
             logger.debug("MQTT client disconnected: %s", client_id)
             logger.debug("MQTT client disconnected: %s", client_id)
             self._clients.pop(client_id, None)
             self._clients.pop(client_id, None)
@@ -532,6 +597,79 @@ class SimpleMQTTServer:
 
 
         return None
         return None
 
 
+    def _record_pending_request(self, data: dict, client_id: str) -> None:
+        """Stash sequence_id → client_id for any nested block with a sequence_id.
+
+        Slicer commands typically wrap their seq id in ``{"print": {...}}`` or
+        ``{"info": {...}}`` / ``{"system": {...}}`` etc. Walks top-level dict
+        values once to find the seq id; if absent (some commands omit it) we
+        skip — the response will fall through to broadcast which is fine for
+        unsolicited pushes.
+        """
+        for block in data.values():
+            if isinstance(block, dict):
+                seq = block.get("sequence_id")
+                if seq is not None:
+                    key = str(seq)
+                    # Evict oldest entry when over the cap. Python dicts
+                    # preserve insertion order so iter(self._pending_requests)
+                    # yields the oldest key first.
+                    while len(self._pending_requests) >= _PENDING_REQUEST_MAX_ENTRIES:
+                        oldest = next(iter(self._pending_requests))
+                        self._pending_requests.pop(oldest, None)
+                    self._pending_requests[key] = client_id
+                    return
+
+    def _lookup_pending_request_client(self, payload: bytes) -> str | None:
+        """Parse a bridge-forwarded MQTT payload and return the originating
+        client_id if its sequence_id was recorded.
+
+        Returns ``None`` for printer-initiated pushes (no recorded seq id) so
+        push_raw_to_clients falls back to broadcast — required for push_status
+        and the other unsolicited pushes that every connected slicer expects.
+        """
+        try:
+            parsed = json.loads(payload)
+        except (ValueError, TypeError):
+            return None
+        if not isinstance(parsed, dict):
+            return None
+        for block in parsed.values():
+            if isinstance(block, dict):
+                seq = block.get("sequence_id")
+                if seq is not None:
+                    return self._pending_requests.pop(str(seq), None)
+        return None
+
+    def _is_auth_rate_limited(self, source_ip: str) -> bool:
+        """Return True if ``source_ip`` has hit the per-IP failure cap.
+
+        Prunes timestamps older than the window so the dict doesn't grow
+        unbounded. Uses ``time.monotonic()`` for a wall-clock-jump-immune
+        clock that's safe to call from any context (sync or async).
+        """
+        import time as _time
+
+        now = _time.monotonic()
+        window_start = now - _AUTH_RATE_LIMIT_WINDOW_SECONDS
+        recent = [t for t in self._auth_failures.get(source_ip, []) if t >= window_start]
+        if recent:
+            self._auth_failures[source_ip] = recent
+        else:
+            self._auth_failures.pop(source_ip, None)
+        return len(recent) >= _AUTH_RATE_LIMIT_MAX_ATTEMPTS
+
+    def _record_auth_failure(self, source_ip: str) -> None:
+        """Append a timestamp for ``source_ip``'s failed auth attempt."""
+        import time as _time
+
+        now = _time.monotonic()
+        self._auth_failures.setdefault(source_ip, []).append(now)
+
+    def _clear_auth_failures(self, source_ip: str) -> None:
+        """Reset ``source_ip``'s failure history after a successful auth."""
+        self._auth_failures.pop(source_ip, None)
+
     async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> tuple[bool, int]:
     async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> tuple[bool, int]:
         """Handle MQTT CONNECT packet.
         """Handle MQTT CONNECT packet.
 
 
@@ -576,8 +714,11 @@ class SimpleMQTTServer:
             idx += 2
             idx += 2
             password = payload[idx : idx + password_len].decode("utf-8")
             password = payload[idx : idx + password_len].decode("utf-8")
 
 
-            # Authenticate
-            if username == "bblp" and password == self.access_code:
+            # Authenticate. ``hmac.compare_digest`` is constant-time to keep
+            # the auth check from leaking the access code via response timing
+            # under network jitter — LAN-only threat is marginal, but it's
+            # the standard fix and costs nothing.
+            if username == "bblp" and hmac.compare_digest(password, self.access_code):
                 # Send CONNACK with success
                 # Send CONNACK with success
                 writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
                 writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
                 await writer.drain()
                 await writer.drain()
@@ -668,7 +809,13 @@ class SimpleMQTTServer:
             if isinstance(cached, dict):
             if isinstance(cached, dict):
                 # Real-printer-shaped response. Copy the cache, then replace the
                 # Real-printer-shaped response. Copy the cache, then replace the
                 # protocol / upload-state fields with values under our control.
                 # protocol / upload-state fields with values under our control.
-                print_block = dict(cached)
+                # Deep copy — current mutations are top-level only, but a future
+                # override that writes into a nested dict (e.g. ``online``,
+                # ``upgrade_state``, ``ipcam``) would otherwise corrupt the
+                # bridge cache and be read by every subsequent subscriber until
+                # the next real-printer push lands. Cost is one allocation per
+                # status report; the cached dict is already short-lived.
+                print_block = copy.deepcopy(cached)
                 print_block["sequence_id"] = str(self._sequence_id)
                 print_block["sequence_id"] = str(self._sequence_id)
                 print_block["command"] = "push_status"
                 print_block["command"] = "push_status"
                 print_block["msg"] = 0
                 print_block["msg"] = 0
@@ -694,6 +841,23 @@ class SimpleMQTTServer:
                 print_block["home_flag"] = print_block.get("home_flag", 0) | 0x100  # bit 8 = HAS_SDCARD_NORMAL
                 print_block["home_flag"] = print_block.get("home_flag", 0) | 0x100  # bit 8 = HAS_SDCARD_NORMAL
                 print_block["sdcard"] = True
                 print_block["sdcard"] = True
                 print_block.setdefault("storage", {"free": 1_000_000_000, "total": 32_000_000_000})
                 print_block.setdefault("storage", {"free": 1_000_000_000, "total": 32_000_000_000})
+                # Live-progress fields the slicer's Send pre-flight reads
+                # (#1558). When the real target printer is mid-print, the
+                # cached push_status carries the real values for these
+                # fields and the slicer reads the VP as "busy" — refusing
+                # Send — even though gcode_state above is forced to IDLE.
+                # For VP usage the VP isn't actually running the print
+                # the printer is, so these need to mirror the synthetic
+                # stub's idle values. Same shape as #1228 (storage) — the
+                # cached-branch override set just needed extending.
+                print_block["mc_print_stage"] = ""
+                print_block["mc_percent"] = 0
+                print_block["mc_remaining_time"] = 0
+                print_block["stg"] = []
+                print_block["stg_cur"] = 0
+                print_block["layer_num"] = 0
+                print_block["total_layer_num"] = 0
+                print_block["print_error"] = 0
                 status = {"print": print_block}
                 status = {"print": print_block}
                 await self._publish_to_report(writer, status, serial or self.serial)
                 await self._publish_to_report(writer, status, serial or self.serial)
                 return
                 return
@@ -1047,6 +1211,10 @@ class SimpleMQTTServer:
             # Forward anything the synthetic flow didn't handle to the real
             # Forward anything the synthetic flow didn't handle to the real
             # printer. AMS load / dry / xcam / system / extrusion_cali_get etc.
             # printer. AMS load / dry / xcam / system / extrusion_cali_get etc.
             if not handled_locally and self._bridge is not None and self._bridge.is_active:
             if not handled_locally and self._bridge is not None and self._bridge.is_active:
+                # Remember which client originated this command so the
+                # printer's response goes back only to them (not fanned
+                # out to every connected slicer).
+                self._record_pending_request(data, client_id)
                 self._bridge.forward_to_printer(data)
                 self._bridge.forward_to_printer(data)
 
 
         except (IndexError, ValueError, OSError) as e:
         except (IndexError, ValueError, OSError) as e:

+ 5 - 0
backend/app/services/virtual_printer/ssdp_server.py

@@ -55,6 +55,9 @@ class VirtualPrinterSSDPServer:
         self.model = model
         self.model = model
         self._bind_ip = bind_ip
         self._bind_ip = bind_ip
         self._running = False
         self._running = False
+        # Set after the primary multicast socket is bound — see ftp_server.py
+        # for rationale.
+        self.ready = asyncio.Event()
         self._socket: socket.socket | None = None
         self._socket: socket.socket | None = None
         self._extra_sockets: list[socket.socket] = []
         self._extra_sockets: list[socket.socket] = []
         self._extra_interfaces = extra_interfaces or []
         self._extra_interfaces = extra_interfaces or []
@@ -169,6 +172,7 @@ class VirtualPrinterSSDPServer:
             local_ip = self._get_local_ip()
             local_ip = self._get_local_ip()
             logger.info("SSDP server listening on port %s, advertising IP: %s", SSDP_PORT, local_ip)
             logger.info("SSDP server listening on port %s, advertising IP: %s", SSDP_PORT, local_ip)
             logger.info("Virtual printer: %s (%s) model=%s", self.name, self.serial, self.model)
             logger.info("Virtual printer: %s (%s) model=%s", self.name, self.serial, self.model)
+            self.ready.set()
 
 
             # Create extra sockets for additional interfaces (VPN, etc.)
             # Create extra sockets for additional interfaces (VPN, etc.)
             # If no explicit extra interfaces given and we're bound to a
             # If no explicit extra interfaces given and we're bound to a
@@ -249,6 +253,7 @@ class VirtualPrinterSSDPServer:
         """Stop the SSDP server."""
         """Stop the SSDP server."""
         logger.info("Stopping SSDP server")
         logger.info("Stopping SSDP server")
         self._running = False
         self._running = False
+        self.ready.clear()
         await self._cleanup()
         await self._cleanup()
 
 
     async def _cleanup(self) -> None:
     async def _cleanup(self) -> None:

+ 7 - 2
backend/app/services/virtual_printer/tailscale.py

@@ -134,13 +134,18 @@ class TailscaleService:
 
 
         try:
         try:
             returncode, stdout, stderr = await self._run_tailscale("status", "--json", timeout=5.0)
             returncode, stdout, stderr = await self._run_tailscale("status", "--json", timeout=5.0)
-        except OSError as e:
+        except (OSError, asyncio.TimeoutError) as e:
+            # asyncio.TimeoutError covers the case where ``_run_tailscale``
+            # killed a stuck subprocess and re-raised. Without this branch
+            # the timeout escaped into the FastAPI route handler and could
+            # crash the VP management UI for users with a lagging
+            # tailscaled daemon.
             return TailscaleStatus(
             return TailscaleStatus(
                 available=False,
                 available=False,
                 hostname="",
                 hostname="",
                 tailnet_name="",
                 tailnet_name="",
                 fqdn="",
                 fqdn="",
-                error=str(e),
+                error=str(e) or "tailscale status timed out",
             )
             )
 
 
         if returncode is None or returncode != 0:
         if returncode is None or returncode != 0:

+ 64 - 6
backend/app/services/virtual_printer/tcp_proxy.py

@@ -814,7 +814,17 @@ class FTPTLSProxy(TLSProxy):
 
 
     async def stop(self) -> None:
     async def stop(self) -> None:
         """Stop proxy and clean up data connection servers."""
         """Stop proxy and clean up data connection servers."""
-        # Close all data servers first
+        # Cancel any pending auto_close timeouts so they don't outlive the
+        # proxy holding a dead server reference. Without this, up to 101
+        # tasks lingered for ~60 s after stop(), each gripping a server
+        # ref + an active_connections slot; under rapid mode-switch the
+        # ports stayed bound long enough to fail the next start().
+        for task in list(self._auto_close_tasks):
+            task.cancel()
+        if self._auto_close_tasks:
+            await asyncio.gather(*self._auto_close_tasks, return_exceptions=True)
+        self._auto_close_tasks.clear()
+        # Close all data servers
         for server in list(self._data_servers):
         for server in list(self._data_servers):
             try:
             try:
                 server.close()
                 server.close()
@@ -827,6 +837,7 @@ class FTPTLSProxy(TLSProxy):
     async def start(self) -> None:
     async def start(self) -> None:
         """Start the FTP TLS proxy."""
         """Start the FTP TLS proxy."""
         self._data_servers: list[asyncio.Server] = []
         self._data_servers: list[asyncio.Server] = []
+        self._auto_close_tasks: list[asyncio.Task] = []
         await super().start()
         await super().start()
 
 
     async def _handle_client(
     async def _handle_client(
@@ -1408,7 +1419,11 @@ class FTPTLSProxy(TLSProxy):
                 if server in self._data_servers:
                 if server in self._data_servers:
                     self._data_servers.remove(server)
                     self._data_servers.remove(server)
 
 
-        asyncio.create_task(auto_close(), name=f"ftp_data_timeout_{port}")
+        # Track the auto_close task so stop() can cancel it; otherwise the
+        # 60 s timeout would hold a server reference past proxy teardown.
+        ac_task = asyncio.create_task(auto_close(), name=f"ftp_data_timeout_{port}")
+        self._auto_close_tasks.append(ac_task)
+        ac_task.add_done_callback(lambda t, tasks=self._auto_close_tasks: tasks.remove(t) if t in tasks else None)
 
 
         logger.debug("FTP data proxy: port %s → %s:%s", port, printer_ip, printer_port)
         logger.debug("FTP data proxy: port %s → %s:%s", port, printer_ip, printer_port)
 
 
@@ -1469,6 +1484,17 @@ class SlicerProxyManager:
         self._bind_server = None
         self._bind_server = None
         self._probe_servers: list[asyncio.Server] = []
         self._probe_servers: list[asyncio.Server] = []
         self._tasks: list[asyncio.Task] = []
         self._tasks: list[asyncio.Task] = []
+        # Pre-bind the lifetime-coupled collections so ``stop()`` works if
+        # called before ``start()`` finishes (rapid mode-switch races).
+        # Previously ``_ftp_data_proxies`` was first assigned inside ``start()``
+        # at line ~1520, so an early stop hit AttributeError and left
+        # sockets stranded.
+        self._ftp_data_proxies: list[TCPProxy] = []
+        # Actual FTP listen port — class constant by default; ``start()``
+        # overwrites with the redirect target when iptables-redirect is
+        # active. ``get_status()`` reads this so diagnostics probe the
+        # port that actually has a listener, not the static LOCAL_FTP_PORT.
+        self._actual_ftp_port: int = self.LOCAL_FTP_PORT
 
 
     # FTP passive data port range — Bambu printers typically use ports in
     # FTP passive data port range — Bambu printers typically use ports in
     # this range for EPSV/PASV data connections. We pre-listen on all of
     # this range for EPSV/PASV data connections. We pre-listen on all of
@@ -1499,8 +1525,25 @@ class SlicerProxyManager:
                 redirect_target,
                 redirect_target,
             )
             )
             ftp_listen_port = redirect_target
             ftp_listen_port = redirect_target
-
-        # FTP control — raw TCP pass-through (end-to-end TLS with printer)
+        # Cache the actual listen port for get_status() / diagnostic so the
+        # port_ftps check probes the port that actually has a socket.
+        self._actual_ftp_port = ftp_listen_port
+
+        # FTP control — raw TCP pass-through (end-to-end TLS with printer).
+        # A TLS 1.3 → 1.2 ClientHello-rewrite was attempted to work around
+        # BambuStudio's libcurl bug on the X1C FTPS data channel (PSK
+        # session-resumption + CURLE_PARTIAL_FILE). Reverted because the
+        # rewrite broke the control-channel TLS handshake itself: replacing
+        # 0x0304 with a duplicate 0x0303 in supported_versions while leaving
+        # the TLS-1.3-only extensions (key_share, psk_key_exchange_modes,
+        # signature_algorithms_cert) in place produced a malformed ClientHello
+        # that the printer or slicer rejected, and the connection closed
+        # before any data channel was opened. A proper fix needs full TLS
+        # bumping (terminate + re-establish) with packet-capture work
+        # that's out of scope for now. X1C proxy-mode FTP uploads remain
+        # broken — users with X1C should use the non-proxy modes (immediate
+        # / review / print_queue) which work end-to-end via the VP's own
+        # FTP server on TLS 1.2.
         self._ftp_proxy = TCPProxy(
         self._ftp_proxy = TCPProxy(
             name="FTP",
             name="FTP",
             listen_port=ftp_listen_port,
             listen_port=ftp_listen_port,
@@ -1745,8 +1788,16 @@ class SlicerProxyManager:
             await dp.stop()
             await dp.stop()
         self._ftp_data_proxies = []
         self._ftp_data_proxies = []
 
 
+        # Probe servers need wait_closed — without it the OS releases the
+        # bind socket asynchronously and a rapid stop+start cycle (e.g.
+        # config-change-driven mode switch) can race "address already in
+        # use" on the probe ports.
         for srv in self._probe_servers:
         for srv in self._probe_servers:
-            srv.close()
+            try:
+                srv.close()
+                await srv.wait_closed()
+            except OSError:
+                pass  # Best-effort — port may already be released
         self._probe_servers = []
         self._probe_servers = []
 
 
         # Cancel tasks
         # Cancel tasks
@@ -1798,7 +1849,14 @@ class SlicerProxyManager:
         return {
         return {
             "running": self.is_running,
             "running": self.is_running,
             "target_host": self.target_host,
             "target_host": self.target_host,
-            "ftp_port": self.LOCAL_FTP_PORT,
+            # ``_actual_ftp_port`` reflects the iptables-redirected listen
+            # port when the docker-host deployment uses
+            # ``iptables -t nat -A PREROUTING ... REDIRECT --to-port`` to
+            # let non-root containers serve on the printer's 990. Returning
+            # the class constant here made the diagnostic probe a port
+            # nothing was listening on and report a false fail on every
+            # working redirect deployment.
+            "ftp_port": self._actual_ftp_port,
             "mqtt_port": self.LOCAL_MQTT_PORT,
             "mqtt_port": self.LOCAL_MQTT_PORT,
             "bind_ports": self.PRINTER_BIND_PORTS,
             "bind_ports": self.PRINTER_BIND_PORTS,
             "ftp_connections": (len(self._ftp_proxy._active_connections) if self._ftp_proxy else 0),
             "ftp_connections": (len(self._ftp_proxy._active_connections) if self._ftp_proxy else 0),

+ 242 - 4
backend/tests/unit/services/test_virtual_printer.py

@@ -1030,6 +1030,220 @@ class TestVirtualPrinterInstance:
         kwargs = archive_print_mock.await_args.kwargs
         kwargs = archive_print_mock.await_args.kwargs
         assert kwargs.get("prefer_filename_for_name") is expected_prefer_filename
         assert kwargs.get("prefer_filename_for_name") is expected_prefer_filename
 
 
+    # ========================================================================
+    # Tests for failure-path cleanup (#audit-R2-1)
+    # ========================================================================
+    #
+    # All three file handlers (_archive_file, _queue_file, _add_to_print_queue)
+    # previously only popped _pending_files and unlinked the temp file on the
+    # success branch. Failure paths leaked the marker (blocking same-name
+    # retries via the FTP layer) and the temp file on disk. The cleanup must
+    # ALWAYS run, even when archival / queue insert raises.
+
+    @pytest.mark.asyncio
+    async def test_archive_file_failure_path_pops_pending_and_unlinks(self, tmp_path):
+        """When the archive layer raises, `_pending_files[filename]` must still
+        be popped and the temp file must be unlinked. Otherwise the FTP layer's
+        same-name retry guard would silently reject the slicer's next attempt
+        and the upload_dir would accumulate ghost files."""
+        from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
+
+        mock_db = AsyncMock()
+        mock_session_factory = MagicMock()
+        mock_session_ctx = AsyncMock()
+        mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
+        mock_session_factory.return_value = mock_session_ctx
+
+        inst = VirtualPrinterInstance(
+            vp_id=40,
+            name="ArchiveFailCleanup",
+            mode="immediate",
+            model="C12",
+            access_code="12345678",
+            serial_suffix="391800040",
+            base_dir=tmp_path,
+            session_factory=mock_session_factory,
+        )
+        file_path = tmp_path / "cleanup-archive.3mf"
+        file_path.write_bytes(b"fake3mf")
+        inst._pending_files[file_path.name] = file_path
+
+        with (
+            patch(
+                "backend.app.api.routes.settings.get_setting",
+                new_callable=AsyncMock,
+                return_value=None,
+            ),
+            patch(
+                "backend.app.services.archive.ArchiveService.archive_print",
+                new_callable=AsyncMock,
+                side_effect=RuntimeError("archive blew up"),
+            ),
+        ):
+            await inst._archive_file(file_path, "192.168.1.100")
+
+        assert file_path.name not in inst._pending_files
+        assert not file_path.exists()
+
+    @pytest.mark.asyncio
+    async def test_queue_file_failure_path_pops_pending_and_unlinks(self, tmp_path):
+        """Same invariant for _queue_file: a DB error during PendingUpload
+        insert must not leak the in-flight marker or the temp file."""
+        from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
+
+        mock_db = AsyncMock()
+        # Commit raises — emulating a DB connectivity error.
+        mock_db.add = MagicMock()
+        mock_db.commit = AsyncMock(side_effect=RuntimeError("db unreachable"))
+        mock_session_factory = MagicMock()
+        mock_session_ctx = AsyncMock()
+        mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
+        mock_session_factory.return_value = mock_session_ctx
+
+        inst = VirtualPrinterInstance(
+            vp_id=41,
+            name="QueueFailCleanup",
+            mode="review",
+            model="C12",
+            access_code="12345678",
+            serial_suffix="391800041",
+            base_dir=tmp_path,
+            session_factory=mock_session_factory,
+        )
+        file_path = tmp_path / "cleanup-queue.3mf"
+        file_path.write_bytes(b"fake3mf")
+        inst._pending_files[file_path.name] = file_path
+
+        await inst._queue_file(file_path, "192.168.1.100")
+
+        assert file_path.name not in inst._pending_files
+        assert not file_path.exists()
+
+    @pytest.mark.asyncio
+    async def test_add_to_print_queue_failure_path_pops_pending_and_unlinks(self, tmp_path):
+        """Same invariant for _add_to_print_queue: a DB error or archive
+        failure must not leak the in-flight marker or the temp file."""
+        from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
+
+        mock_db = AsyncMock()
+        mock_db.add = MagicMock()
+        mock_db.commit = AsyncMock()
+        mock_db.execute = AsyncMock(side_effect=RuntimeError("queue insert blew up"))
+        mock_session_factory = MagicMock()
+        mock_session_ctx = AsyncMock()
+        mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
+        mock_session_factory.return_value = mock_session_ctx
+
+        inst = VirtualPrinterInstance(
+            vp_id=42,
+            name="DispatchFailCleanup",
+            mode="print_queue",
+            model="C12",
+            access_code="12345678",
+            serial_suffix="391800042",
+            auto_dispatch=True,
+            base_dir=tmp_path,
+            session_factory=mock_session_factory,
+        )
+        file_path = tmp_path / "cleanup-dispatch.3mf"
+        file_path.write_bytes(b"fake3mf")
+        inst._pending_files[file_path.name] = file_path
+
+        with patch(
+            "backend.app.api.routes.settings.get_setting",
+            new_callable=AsyncMock,
+            return_value=None,
+        ):
+            await inst._add_to_print_queue(file_path, "192.168.1.100")
+
+        assert file_path.name not in inst._pending_files
+        assert not file_path.exists()
+
+    # ========================================================================
+    # Test for position=MAX+1 (audit-R2)
+    # ========================================================================
+
+    @pytest.mark.asyncio
+    async def test_add_to_print_queue_position_picks_max_plus_one(self, tmp_path):
+        """VP-queue items previously got hardcoded `position=1`, colliding
+        with existing items at position 1 and producing non-deterministic
+        execution order. Now the position is chosen by `MAX(position)+1`
+        against the target queue, matching the canonical `POST /print-queue/`
+        path."""
+        from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
+
+        # Capture the inserted PrintQueueItem so we can assert on .position.
+        added_items: list = []
+
+        class _RecordingDb:
+            def __init__(self):
+                self.add = lambda item: added_items.append(item)
+                self.commit = AsyncMock()
+
+            async def execute(self, query):  # noqa: ARG002
+                """Return a stub result whose `.scalar()` reports the existing
+                MAX(position) for the target. Returning 7 means the new item
+                should land at 8."""
+                result = MagicMock()
+                result.scalar = MagicMock(return_value=7)
+                return result
+
+        mock_db = _RecordingDb()
+        mock_session_factory = MagicMock()
+        mock_session_ctx = AsyncMock()
+        mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
+        mock_session_factory.return_value = mock_session_ctx
+
+        inst = VirtualPrinterInstance(
+            vp_id=43,
+            name="PositionMaxPlusOne",
+            mode="print_queue",
+            model="C12",
+            access_code="12345678",
+            serial_suffix="391800043",
+            target_printer_id=99,
+            auto_dispatch=True,
+            base_dir=tmp_path,
+            session_factory=mock_session_factory,
+        )
+        file_path = tmp_path / "next-position.3mf"
+        file_path.write_bytes(b"fake3mf")
+
+        mock_archive = MagicMock()
+        mock_archive.id = 555
+        mock_archive.printer_id = None
+        mock_archive.filename = "next-position.3mf"
+        mock_archive.print_name = "next-position"
+        mock_archive.status = "archived"
+
+        with (
+            patch(
+                "backend.app.api.routes.settings.get_setting",
+                new_callable=AsyncMock,
+                return_value=None,
+            ),
+            patch(
+                "backend.app.services.archive.ArchiveService.archive_print",
+                new_callable=AsyncMock,
+                return_value=mock_archive,
+            ),
+            patch(
+                "backend.app.core.websocket.ws_manager.send_archive_created",
+                new_callable=AsyncMock,
+            ),
+        ):
+            await inst._add_to_print_queue(file_path, "192.168.1.100")
+
+        # One queue item was added.
+        assert len(added_items) == 1
+        queue_item = added_items[0]
+        # Position = max(7) + 1 = 8 — NOT the legacy hardcoded 1.
+        assert queue_item.position == 8
+
 
 
 class TestVirtualPrinterManager:
 class TestVirtualPrinterManager:
     """Tests for VirtualPrinterManager orchestrator."""
     """Tests for VirtualPrinterManager orchestrator."""
@@ -1203,6 +1417,7 @@ class TestVirtualPrinterManager:
             "target_printer_id": None,
             "target_printer_id": None,
             "auto_dispatch": True,
             "auto_dispatch": True,
             "tailscale_disabled": True,  # Opt-in default (#1070 UX fix)
             "tailscale_disabled": True,  # Opt-in default (#1070 UX fix)
+            "queue_force_color_match": False,  # default — must be explicit so MagicMock truthiness doesn't trip the change detector
             "position": 0,
             "position": 0,
         }
         }
         defaults.update(overrides)
         defaults.update(overrides)
@@ -2391,11 +2606,34 @@ class TestBindServer:
             base_dir=tmp_path,
             base_dir=tmp_path,
         )
         )
 
 
+        # Each mocked child service exposes a real asyncio.Event for the
+        # readiness barrier added in start_server (set on instantiation so
+        # the barrier returns immediately in tests).
+        ready_event = asyncio.Event()
+        ready_event.set()
+
+        def with_ready(*_args, **_kwargs):
+            child = MagicMock()
+            child.ready = ready_event
+            return child
+
         with (
         with (
-            patch("backend.app.services.virtual_printer.manager.VirtualPrinterSSDPServer"),
-            patch("backend.app.services.virtual_printer.manager.VirtualPrinterFTPServer"),
-            patch("backend.app.services.virtual_printer.manager.SimpleMQTTServer"),
-            patch("backend.app.services.virtual_printer.manager.BindServer") as mock_bind_cls,
+            patch(
+                "backend.app.services.virtual_printer.manager.VirtualPrinterSSDPServer",
+                side_effect=with_ready,
+            ),
+            patch(
+                "backend.app.services.virtual_printer.manager.VirtualPrinterFTPServer",
+                side_effect=with_ready,
+            ),
+            patch(
+                "backend.app.services.virtual_printer.manager.SimpleMQTTServer",
+                side_effect=with_ready,
+            ),
+            patch(
+                "backend.app.services.virtual_printer.manager.BindServer",
+                side_effect=with_ready,
+            ) as mock_bind_cls,
             patch.object(inst._cert_service, "delete_printer_certificate"),
             patch.object(inst._cert_service, "delete_printer_certificate"),
             patch.object(
             patch.object(
                 inst._cert_service,
                 inst._cert_service,

+ 76 - 0
backend/tests/unit/test_vp_certificate_rotation.py

@@ -0,0 +1,76 @@
+"""Tests for CertificateService.ensure_certificates' CA-rotation guard.
+
+When the shared CA is regenerated (e.g. its expiry crossed
+``CA_EXPIRY_THRESHOLD_DAYS``), any per-VP printer certificate that was
+signed by the OLD CA becomes orphaned: it still exists on disk and the
+old fallback ``cert_path.exists()`` check would happily reuse it. A
+slicer that imported the NEW CA then fails the TLS handshake because
+the printer cert's issuer doesn't match anything in its trust store.
+
+``_cert_matches_current_ca`` is the guard. It compares the on-disk
+printer cert's issuer against the on-disk CA cert's subject; on
+mismatch ``ensure_certificates`` regenerates the per-VP cert under the
+current CA.
+"""
+
+from backend.app.services.virtual_printer.certificate import CertificateService
+
+
+def test_ensure_certificates_reuses_cert_when_issuer_matches_ca(tmp_path):
+    """Happy path: a freshly-generated CA + per-VP cert pair shares
+    issuer/subject. ``ensure_certificates`` reads them back without
+    regenerating."""
+    svc = CertificateService(cert_dir=tmp_path, serial="01P00A391800001")
+
+    # First call: generates the CA + per-VP cert from scratch.
+    first_cert, first_key = svc.ensure_certificates()
+    first_cert_bytes = first_cert.read_bytes()
+
+    # Second call: cert + CA exist and the issuer matches. Should reuse.
+    second_cert, _ = svc.ensure_certificates()
+    assert second_cert.read_bytes() == first_cert_bytes
+
+
+def test_ensure_certificates_regenerates_when_ca_rotated(tmp_path):
+    """CA rotation scenario: the CA file is replaced with a different one
+    (e.g. previous expired and was regenerated). The per-VP cert on disk
+    was signed by the old CA, so its issuer no longer matches the new CA's
+    subject. ``ensure_certificates`` must regenerate the per-VP cert."""
+    # Build the first pair.
+    svc1 = CertificateService(cert_dir=tmp_path, serial="01P00A391800001")
+    orig_cert_bytes = svc1.ensure_certificates()[0].read_bytes()
+    orig_ca_bytes = svc1.ca_cert_path.read_bytes()
+
+    # Simulate CA rotation: build a SECOND CA in a different dir, then
+    # swap that CA's files into the original CA path. The per-VP cert
+    # still on disk was signed by the original CA — issuer mismatch now.
+    rotated_dir = tmp_path / "rotated"
+    rotated_dir.mkdir()
+    svc_rotated = CertificateService(cert_dir=rotated_dir, serial="01P00A391800002")
+    svc_rotated.ensure_certificates()
+
+    # Overwrite the original CA on disk with the rotated one.
+    svc1.ca_cert_path.write_bytes(svc_rotated.ca_cert_path.read_bytes())
+    svc1.ca_key_path.write_bytes(svc_rotated.ca_key_path.read_bytes())
+    assert svc1.ca_cert_path.read_bytes() != orig_ca_bytes  # confirm rotation
+
+    # Build a fresh service against the rotated CA, then ensure_certificates
+    # should detect the mismatch and regenerate the per-VP cert.
+    svc2 = CertificateService(cert_dir=tmp_path, serial="01P00A391800001")
+    new_cert, _ = svc2.ensure_certificates()
+    new_cert_bytes = new_cert.read_bytes()
+
+    # New per-VP cert must differ from the old (signed by a different CA now).
+    assert new_cert_bytes != orig_cert_bytes
+
+
+def test_cert_matches_current_ca_returns_false_when_no_ca(tmp_path):
+    """If the CA file is missing entirely, the match check must return
+    False so ``ensure_certificates`` falls through to ``generate_certificates``
+    instead of returning a per-VP cert that nothing can validate."""
+    svc = CertificateService(cert_dir=tmp_path, serial="01P00A391800001")
+    # Write a per-VP cert without a CA.
+    svc.cert_path.write_bytes(b"fake cert content")
+    svc.key_path.write_bytes(b"fake key content")
+    # No bbl_ca.crt on disk → match fails safely.
+    assert svc._cert_matches_current_ca() is False

+ 137 - 0
backend/tests/unit/test_vp_delete_cleanup.py

@@ -0,0 +1,137 @@
+"""Tests for DELETE /virtual-printers/{vp_id} orphan cleanup.
+
+Before the fix, deleting a VP only stopped the running instance and
+removed the row. The on-disk ``base_dir/uploads/<vp_id>/`` directory
+lingered, and any ``PendingUpload`` rows that pointed into it remained
+in ``pending`` status — showing up as phantom entries in
+``/pending-uploads/``. The route now (a) marks those rows as
+``discarded`` and (b) ``shutil.rmtree``s the upload_dir after the DB
+commit succeeds.
+"""
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from backend.app.api.routes.virtual_printers import delete_virtual_printer
+
+
+@pytest.mark.asyncio
+async def test_delete_vp_marks_orphan_pending_uploads_discarded(tmp_path):
+    """A VP with PendingUpload rows pointing at its upload_dir: after
+    DELETE, those rows must be flipped to ``discarded`` and the on-disk
+    directory must be gone."""
+    vp_id = 77
+    upload_dir = tmp_path / "uploads" / str(vp_id)
+    upload_dir.mkdir(parents=True)
+    (upload_dir / "stale.3mf").write_bytes(b"orphaned content")
+
+    # Build PendingUpload-like mocks. The route mutates `.status`.
+    pending_a = MagicMock()
+    pending_a.file_path = str(upload_dir / "stale.3mf")
+    pending_a.status = "pending"
+    pending_b = MagicMock()
+    pending_b.file_path = str(upload_dir / "another.3mf")
+    pending_b.status = "pending"
+
+    # Unrelated PendingUpload that does NOT belong to this VP — must
+    # be left alone.
+    other_pending = MagicMock()
+    other_pending.file_path = str(tmp_path / "uploads" / "99" / "not-mine.3mf")
+    other_pending.status = "pending"
+
+    # Mock VP row.
+    vp_row = MagicMock()
+    vp_row.id = vp_id
+    vp_row.name = "DeleteMe"
+
+    # Mock DB session with the route's two .execute() calls + flush + commit.
+    select_calls = {"i": 0}
+
+    async def fake_execute(query):  # noqa: ARG001
+        """Return the VP row on the first call (vp lookup) and the
+        in-range PendingUpload rows on the second call (orphan query).
+        Third call is the DELETE which doesn't need a result."""
+        select_calls["i"] += 1
+        result = MagicMock()
+        if select_calls["i"] == 1:
+            result.scalar_one_or_none = MagicMock(return_value=vp_row)
+        elif select_calls["i"] == 2:
+            scalars = MagicMock()
+            scalars.all = MagicMock(return_value=[pending_a, pending_b])
+            result.scalars = MagicMock(return_value=scalars)
+        return result
+
+    db = AsyncMock()
+    db.execute = fake_execute
+    db.flush = AsyncMock()
+    db.commit = AsyncMock()
+
+    # Mock the manager: remove_instance, _base_dir, sync_from_db.
+    fake_manager = MagicMock()
+    fake_manager.remove_instance = AsyncMock()
+    fake_manager.sync_from_db = AsyncMock()
+    fake_manager._base_dir = tmp_path
+
+    with patch(
+        "backend.app.services.virtual_printer.virtual_printer_manager",
+        fake_manager,
+    ):
+        await delete_virtual_printer(vp_id=vp_id, db=db, _=None)
+
+    # Both in-range PendingUpload rows must be flipped to "discarded".
+    assert pending_a.status == "discarded"
+    assert pending_b.status == "discarded"
+    # The unrelated row was never returned from the query — left alone.
+    assert other_pending.status == "pending"
+
+    # The on-disk upload_dir is gone.
+    assert not upload_dir.exists()
+
+    # The running instance was stopped before the row was removed.
+    fake_manager.remove_instance.assert_awaited_once_with(vp_id)
+
+
+@pytest.mark.asyncio
+async def test_delete_vp_with_no_orphan_uploads_still_succeeds(tmp_path):
+    """A VP with no PendingUpload rows and no upload_dir on disk: the
+    cleanup path must be a clean no-op, not raise."""
+    vp_id = 88
+
+    vp_row = MagicMock()
+    vp_row.id = vp_id
+    vp_row.name = "EmptyDelete"
+
+    select_calls = {"i": 0}
+
+    async def fake_execute(query):  # noqa: ARG001
+        select_calls["i"] += 1
+        result = MagicMock()
+        if select_calls["i"] == 1:
+            result.scalar_one_or_none = MagicMock(return_value=vp_row)
+        elif select_calls["i"] == 2:
+            # No PendingUpload rows match.
+            scalars = MagicMock()
+            scalars.all = MagicMock(return_value=[])
+            result.scalars = MagicMock(return_value=scalars)
+        return result
+
+    db = AsyncMock()
+    db.execute = fake_execute
+    db.flush = AsyncMock()
+    db.commit = AsyncMock()
+
+    fake_manager = MagicMock()
+    fake_manager.remove_instance = AsyncMock()
+    fake_manager.sync_from_db = AsyncMock()
+    fake_manager._base_dir = tmp_path  # no uploads/<vp_id> exists
+
+    with patch(
+        "backend.app.services.virtual_printer.virtual_printer_manager",
+        fake_manager,
+    ):
+        await delete_virtual_printer(vp_id=vp_id, db=db, _=None)
+
+    fake_manager.remove_instance.assert_awaited_once_with(vp_id)
+    # No directory to remove — and we didn't crash trying to.
+    assert not (tmp_path / "uploads" / str(vp_id)).exists()

+ 143 - 0
backend/tests/unit/test_vp_ftp_stor.py

@@ -0,0 +1,143 @@
+"""Tests for the FTPSession.cmd_STOR streaming + size-cap behaviour.
+
+The original cmd_STOR buffered the entire upload in a ``list[bytes]`` and
+called ``write_bytes`` at the end. For multi-GB ``.gcode.3mf`` files this
+peaked at ~2× the file size in RSS (chunks held + the ``b''.join`` of
+them) and could OOM low-memory hosts. The streaming rewrite writes each
+chunk to disk inline (memory bounded at one chunk) and enforces
+``MAX_UPLOAD_BYTES``. These tests pin both behaviours without standing
+up a real TLS/FTP server.
+"""
+
+import asyncio
+import ssl
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+from backend.app.services.virtual_printer.ftp_server import MAX_UPLOAD_BYTES, FTPSession
+
+
+def _make_session(tmp_path, *, data_chunks: list[bytes]) -> FTPSession:
+    """Build an FTPSession primed with a pre-fed StreamReader so cmd_STOR
+    can iterate through the chunks without a real TCP connection.
+    """
+    control_writer = MagicMock()
+    control_writer.write = MagicMock()
+    control_writer.drain = AsyncMock()
+    control_writer.get_extra_info = MagicMock(return_value=("192.168.1.99", 12345))
+
+    upload_dir = tmp_path / "uploads"
+    upload_dir.mkdir(parents=True, exist_ok=True)
+
+    session = FTPSession(
+        reader=asyncio.StreamReader(),
+        writer=control_writer,
+        upload_dir=upload_dir,
+        access_code="deadbeef",
+        ssl_context=ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER),
+        on_file_received=None,
+        bind_address="127.0.0.1",
+        vp_name="stor-test",
+    )
+    session.authenticated = True
+
+    data_reader = asyncio.StreamReader()
+    for chunk in data_chunks:
+        data_reader.feed_data(chunk)
+    data_reader.feed_eof()
+    session._data_reader = data_reader
+
+    data_writer = MagicMock()
+    data_writer.close = MagicMock()
+    data_writer.wait_closed = AsyncMock()
+    session._data_writer = data_writer
+
+    session._data_connected.set()
+    session.data_server = None
+
+    return session
+
+
+@pytest.mark.asyncio
+async def test_stor_writes_payload_to_disk(tmp_path):
+    """Happy path: chunks fed to the data reader land in the upload_dir
+    with the right content + the slicer gets 226."""
+    payload = b"X" * (3 * 64 * 1024 + 123)  # 3 chunks + a partial one
+    chunks = [payload[i : i + 65536] for i in range(0, len(payload), 65536)]
+    session = _make_session(tmp_path, data_chunks=chunks)
+    session.send = AsyncMock()
+
+    await session.cmd_STOR("Untitled.gcode.3mf")
+
+    saved = session.upload_dir / "Untitled.gcode.3mf"
+    assert saved.exists()
+    assert saved.stat().st_size == len(payload)
+    assert saved.read_bytes() == payload
+
+    sent_codes = [args[0][0] for args in session.send.call_args_list]
+    assert 150 in sent_codes  # "Opening data connection"
+    assert 226 in sent_codes  # "Transfer complete"
+
+
+@pytest.mark.asyncio
+async def test_stor_rejects_upload_over_max_upload_bytes(tmp_path, monkeypatch):
+    """A single chunk taking us over the cap must abort with 426 and
+    drop the partially-written file so it doesn't masquerade as a
+    successful upload."""
+    # Lower the cap to 100 KiB so the test doesn't need to allocate
+    # 4 GiB to trigger it. The same logic governs the production cap.
+    monkeypatch.setattr(
+        "backend.app.services.virtual_printer.ftp_server.MAX_UPLOAD_BYTES",
+        100 * 1024,
+    )
+
+    over_cap = b"X" * (200 * 1024)  # 200 KiB > 100 KiB cap
+    session = _make_session(tmp_path, data_chunks=[over_cap])
+    session.send = AsyncMock()
+
+    await session.cmd_STOR("toobig.gcode.3mf")
+
+    # Partial file must be unlinked.
+    assert not (session.upload_dir / "toobig.gcode.3mf").exists()
+    # 426 (transfer failed) sent — not 226.
+    sent_codes = [args[0][0] for args in session.send.call_args_list]
+    assert 426 in sent_codes
+    assert 226 not in sent_codes
+
+
+@pytest.mark.asyncio
+async def test_stor_cleans_up_partial_file_on_read_error(tmp_path):
+    """If the data channel raises mid-transfer (slicer RST, TLS error,
+    timeout, …), the partial file on disk must be removed so the next
+    upload of the same name starts clean and the user doesn't see a
+    truncated file in the upload_dir."""
+    payload = b"X" * 65536  # one full chunk
+    session = _make_session(tmp_path, data_chunks=[payload])
+    session.send = AsyncMock()
+
+    # Inject an OSError on the NEXT read after the first chunk.
+    orig_read = session._data_reader.read
+    state = {"calls": 0}
+
+    async def read_then_error(n):
+        state["calls"] += 1
+        if state["calls"] == 1:
+            return await orig_read(n)
+        raise OSError("simulated connection reset")
+
+    session._data_reader.read = read_then_error  # type: ignore[assignment]
+
+    await session.cmd_STOR("aborted.gcode.3mf")
+
+    # Partial file removed.
+    assert not (session.upload_dir / "aborted.gcode.3mf").exists()
+    sent_codes = [args[0][0] for args in session.send.call_args_list]
+    assert 426 in sent_codes
+
+
+def test_max_upload_bytes_is_at_least_4_gib():
+    """The cap exists to prevent OOM, but should be high enough that
+    legitimate multi-plate .gcode.3mf uploads (~hundreds of MB) succeed
+    without bumping up against it. 4 GiB is the documented floor."""
+    assert MAX_UPLOAD_BYTES >= 4 * 1024 * 1024 * 1024

+ 42 - 0
backend/tests/unit/test_vp_mqtt_bridge.py

@@ -774,6 +774,48 @@ class TestStatusReportCachedAsBase:
         assert payload["print"]["gcode_state"] == "PREPARE"
         assert payload["print"]["gcode_state"] == "PREPARE"
         assert payload["print"]["gcode_file"] == "foo.3mf"
         assert payload["print"]["gcode_file"] == "foo.3mf"
 
 
+    @pytest.mark.asyncio
+    async def test_live_progress_fields_zeroed_in_cached_branch(self):
+        """#1558: when the real target printer is mid-print, the cached
+        push_status carries live values for mc_percent / stg_cur / layer_num /
+        etc. BambuStudio's Send pre-flight reads any of these as "VP busy"
+        even when gcode_state above is forced to IDLE — blocking Send while
+        the target prints. The cached branch must override these to the same
+        idle values the synthetic stub uses.
+        """
+        server = _make_server()
+        bridge = MagicMock()
+        # Real printer mid-print state: gcode_state may be RUNNING upstream,
+        # but the VP's own _gcode_state is IDLE (Send is requesting a
+        # new upload, the VP isn't running anything).
+        bridge.get_latest_print_state.return_value = {
+            "command": "push_status",
+            "msg": 0,
+            "gcode_state": "RUNNING",
+            "mc_print_stage": "2",
+            "mc_percent": 47,
+            "mc_remaining_time": 3600,
+            "stg": [1, 2, 3],
+            "stg_cur": 14,
+            "layer_num": 120,
+            "total_layer_num": 250,
+            "print_error": 0,
+        }
+        server.set_bridge(bridge)
+        published = self._capture_published(server)
+
+        await server._send_status_report(MagicMock())
+        _serial, payload = published[0]
+        # Every live-progress field must reflect "idle / VP isn't busy".
+        assert payload["print"]["mc_print_stage"] == ""
+        assert payload["print"]["mc_percent"] == 0
+        assert payload["print"]["mc_remaining_time"] == 0
+        assert payload["print"]["stg"] == []
+        assert payload["print"]["stg_cur"] == 0
+        assert payload["print"]["layer_num"] == 0
+        assert payload["print"]["total_layer_num"] == 0
+        assert payload["print"]["print_error"] == 0
+
 
 
 # ---------------------------------------------------------------------------
 # ---------------------------------------------------------------------------
 # Wire format
 # Wire format

+ 159 - 0
backend/tests/unit/test_vp_mqtt_server.py

@@ -399,3 +399,162 @@ class TestHandleClientHonoursKeepalive:
         # Exit was via DISCONNECT at ~2.5s, NOT a 3s keepalive timeout.
         # Exit was via DISCONNECT at ~2.5s, NOT a 3s keepalive timeout.
         # Allow generous slop.
         # Allow generous slop.
         assert 2.0 < elapsed < 3.0, f"expected exit on DISCONNECT near 2.5s, got {elapsed:.2f}s"
         assert 2.0 < elapsed < 3.0, f"expected exit on DISCONNECT near 2.5s, got {elapsed:.2f}s"
+
+
+class TestAuthRateLimit:
+    """Per-IP rate-limiting of MQTT CONNECT auth attempts.
+
+    Bambuddy's VP exposes an 8-char access code via the slicer-facing MQTT
+    server. Without a rate-limit the code is brute-forceable by anyone who
+    can reach the VP's bind IP (LAN or VPN). The limiter records each
+    failed auth attempt per source IP and rejects further CONNECTs from
+    that IP once the per-window threshold is crossed, then auto-recovers
+    when the window expires. Verified here against the production
+    constants imported from the module.
+    """
+
+    @pytest.fixture
+    def server(self):
+        from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
+
+        return _make_server(serial="01P00A391800002")
+
+    def test_under_limit_attempts_are_allowed(self, server):
+        from backend.app.services.virtual_printer.mqtt_server import _AUTH_RATE_LIMIT_MAX_ATTEMPTS
+
+        ip = "192.168.1.50"
+        # Record (max-1) failures and verify the next attempt is still allowed.
+        for _ in range(_AUTH_RATE_LIMIT_MAX_ATTEMPTS - 1):
+            server._record_auth_failure(ip)
+        assert server._is_auth_rate_limited(ip) is False
+
+    def test_exactly_max_attempts_triggers_rate_limit(self, server):
+        from backend.app.services.virtual_printer.mqtt_server import _AUTH_RATE_LIMIT_MAX_ATTEMPTS
+
+        ip = "192.168.1.50"
+        for _ in range(_AUTH_RATE_LIMIT_MAX_ATTEMPTS):
+            server._record_auth_failure(ip)
+        # At exactly the cap, further attempts must be rejected.
+        assert server._is_auth_rate_limited(ip) is True
+
+    def test_window_recovery_clears_old_failures(self, server):
+        """A burst of failures older than the window must NOT count
+        against the IP — the limiter is sliding, not cumulative."""
+        import time as _time
+
+        from backend.app.services.virtual_printer.mqtt_server import (
+            _AUTH_RATE_LIMIT_MAX_ATTEMPTS,
+            _AUTH_RATE_LIMIT_WINDOW_SECONDS,
+        )
+
+        ip = "192.168.1.50"
+        # Inject stale timestamps directly — older than the window means the
+        # limiter should drop them on the next probe.
+        stale = _time.monotonic() - _AUTH_RATE_LIMIT_WINDOW_SECONDS - 1.0
+        server._auth_failures[ip] = [stale] * _AUTH_RATE_LIMIT_MAX_ATTEMPTS
+        # All recorded failures are outside the window — IP is no longer rate-limited.
+        assert server._is_auth_rate_limited(ip) is False
+        # And the dict entry was pruned (empty) instead of leaking forever.
+        assert ip not in server._auth_failures
+
+    def test_multiple_ips_tracked_independently(self, server):
+        from backend.app.services.virtual_printer.mqtt_server import _AUTH_RATE_LIMIT_MAX_ATTEMPTS
+
+        # One IP exhausts the budget; another IP must still be allowed.
+        for _ in range(_AUTH_RATE_LIMIT_MAX_ATTEMPTS):
+            server._record_auth_failure("10.0.0.1")
+        assert server._is_auth_rate_limited("10.0.0.1") is True
+        assert server._is_auth_rate_limited("10.0.0.2") is False
+
+    def test_successful_auth_clears_failure_history(self, server):
+        """A successful auth must wipe the IP's prior-failures stash so the
+        user isn't penalised for typos that they ultimately corrected."""
+        from backend.app.services.virtual_printer.mqtt_server import _AUTH_RATE_LIMIT_MAX_ATTEMPTS
+
+        ip = "192.168.1.50"
+        # Build up failures one short of the cap.
+        for _ in range(_AUTH_RATE_LIMIT_MAX_ATTEMPTS - 1):
+            server._record_auth_failure(ip)
+        # Successful auth must clear them.
+        server._clear_auth_failures(ip)
+        # Now a subsequent failure starts the count over at 1 (well under cap).
+        server._record_auth_failure(ip)
+        assert server._is_auth_rate_limited(ip) is False
+
+
+class TestPendingRequestRouting:
+    """`push_raw_to_clients` routes the printer's response back only to the
+    slicer that originated the request, not to every connected slicer.
+
+    The bridge calls `push_raw_to_clients(topic, payload)` for every
+    response it sees from the real printer. Before the fix, this fanned
+    out to every connected slicer — leaking slicer A's
+    `extrusion_cali_get` response into slicer B's command stream. The
+    fix records `sequence_id → client_id` on the way out and looks it
+    back up on the way in.
+    """
+
+    @pytest.fixture
+    def server(self):
+        return _make_server(serial="01P00A391800003")
+
+    def test_single_slicer_routes_to_that_slicer(self, server):
+        """Sanity check: when one slicer is connected, the response goes
+        to it regardless of whether the seq_id was recorded."""
+        # No recorded request, no slicer seen → returns None (broadcast).
+        assert server._lookup_pending_request_client(b'{"print": {"sequence_id": "999"}}') is None
+
+    def test_record_pending_request_walks_nested_blocks(self, server):
+        """The slicer wraps its sequence_id under whichever subsystem the
+        command targets (`print`, `info`, `system`, …). The helper must
+        find it regardless of which key it's nested under."""
+        server._record_pending_request(
+            {"print": {"command": "extrusion_cali_get", "sequence_id": "42"}},
+            "clientA",
+        )
+        assert server._pending_requests.get("42") == "clientA"
+
+        server._record_pending_request(
+            {"info": {"command": "get_version", "sequence_id": "43"}},
+            "clientB",
+        )
+        assert server._pending_requests.get("43") == "clientB"
+
+    def test_lookup_pops_entry_so_each_response_routes_once(self, server):
+        """Once a response is matched, the pending entry is consumed so
+        a later coincidental sequence_id from a printer-initiated push
+        doesn't mis-route to the original client."""
+        server._record_pending_request({"print": {"sequence_id": "100"}}, "clientA")
+        # First lookup finds it…
+        assert server._lookup_pending_request_client(b'{"print": {"sequence_id": "100"}}') == "clientA"
+        # …and removes it. Second lookup with the same seq returns None
+        # (treated as printer-initiated → broadcast fallback).
+        assert server._lookup_pending_request_client(b'{"print": {"sequence_id": "100"}}') is None
+
+    def test_fifo_eviction_when_cache_fills(self, server):
+        """If a slicer sends many commands without responses (or the
+        responses never arrive), the oldest entries age out so the dict
+        can't grow unbounded."""
+        from backend.app.services.virtual_printer.mqtt_server import _PENDING_REQUEST_MAX_ENTRIES
+
+        # Fill the dict to one over the cap.
+        for i in range(_PENDING_REQUEST_MAX_ENTRIES + 1):
+            server._record_pending_request({"print": {"sequence_id": str(i)}}, "clientA")
+        # The dict is capped — the oldest entry ("0") is gone, the newest is in.
+        assert len(server._pending_requests) <= _PENDING_REQUEST_MAX_ENTRIES
+        assert "0" not in server._pending_requests
+        assert str(_PENDING_REQUEST_MAX_ENTRIES) in server._pending_requests
+
+    def test_response_without_recorded_seq_returns_none_for_broadcast(self, server):
+        """Printer-initiated pushes (push_status etc.) have a sequence_id
+        the bridge never saw recorded. ``_lookup_pending_request_client``
+        must return None so ``push_raw_to_clients`` falls back to fan-out
+        — every slicer expects to receive these unsolicited messages."""
+        # No record for this seq id.
+        assert server._lookup_pending_request_client(b'{"print": {"sequence_id": "777"}}') is None
+
+    def test_malformed_payload_falls_through_to_broadcast(self, server):
+        """A non-JSON / non-dict payload must NOT crash the routing path —
+        return None so the response broadcasts."""
+        assert server._lookup_pending_request_client(b"not valid json") is None
+        assert server._lookup_pending_request_client(b'"a string, not a dict"') is None

+ 1 - 1
docker-compose.yml

@@ -34,7 +34,7 @@ services:
     #  - "6000:6000"                  # Virtual printer file transfer tunnel
     #  - "6000:6000"                  # Virtual printer file transfer tunnel
     #  - "322:322"                    # Virtual printer RTSP camera (X1/H2/P2; proxy mode + non-proxy modes with a target printer)
     #  - "322:322"                    # Virtual printer RTSP camera (X1/H2/P2; proxy mode + non-proxy modes with a target printer)
     #  - "2024-2026:2024-2026"        # Virtual printer proprietary ports (A1/P1S)
     #  - "2024-2026:2024-2026"        # Virtual printer proprietary ports (A1/P1S)
-    #  - "50000-50100:50000-50100"    # Virtual printer FTP passive data
+    #  - "50000-51000:50000-51000"    # Virtual printer FTP passive data (widened from 50000-50100 for multi-VP headroom)
     volumes:
     volumes:
       - bambuddy_data:/app/data
       - bambuddy_data:/app/data
       - bambuddy_logs:/app/logs
       - bambuddy_logs:/app/logs

+ 1 - 1
install/docker-install.ps1

@@ -240,7 +240,7 @@ function Update-ComposeForDockerDesktop {
 
 
     [System.IO.File]::WriteAllText($path, $content)
     [System.IO.File]::WriteAllText($path, $content)
     Warn 'Printer auto-discovery (SSDP) does NOT work on Docker Desktop. Add printers manually by IP.'
     Warn 'Printer auto-discovery (SSDP) does NOT work on Docker Desktop. Add printers manually by IP.'
-    Warn 'Virtual Printer ports (322, 990, 2024-2026, 3000/3002, 6000, 8883, 50000-50100) stay commented out.'
+    Warn 'Virtual Printer ports (322, 990, 2024-2026, 3000/3002, 6000, 8883, 50000-51000) stay commented out.'
     Warn 'If you plan to use a Virtual Printer, edit docker-compose.yml and uncomment the relevant `- "PORT:PORT"` lines under `ports:`.'
     Warn 'If you plan to use a Virtual Printer, edit docker-compose.yml and uncomment the relevant `- "PORT:PORT"` lines under `ports:`.'
 }
 }
 
 

Niektóre pliki nie zostały wyświetlone z powodu dużej ilości zmienionych plików