Browse Source

fix(#1134): propagate background-dispatch watchdog timeout as job failure

  Follow-up to #1042. The post-dispatch watchdog _verify_print_response was
  fire-and-forget — it correctly detected when the printer never transitioned
  (HMS error pending, half-broken MQTT session, plate-clear gate, SD card
  fault) and force-reconnected the MQTT session, but the dispatch job had
  already been marked successful on the optimistic MQTT-publish-acknowledged
  path. The UI carried on showing "Print started successfully" while the
  printer sat idle.

  The watchdog now returns bool and is awaited inline by both call sites in
  _run_reprint_archive and _run_print_library_file. On False the call sites
  raise a RuntimeError carrying a user-actionable message ("Printer did not
  acknowledge print command — state still {pre_state}. Check the printer for
  a pending error...") which routes through the existing _run_active_job →
  _mark_job_finished(failed=True) → background_dispatch WS broadcast path.
  Library-file flow rolls back the freshly-created archive on timeout so no
  phantom row is left behind for a print that never started.

  The watchdog now also accepts subtask_id advancing past pre_subtask_id as a
  definitive "command landed" signal — same as the queue-side watchdog at
  print_scheduler.py:1992 — so slow H2D FINISH→PREPARE transitions (~50 s
  observed) don't false-fail when the printer has clearly accepted the
  project_file but is still in FINISH. Default timeout raised from 15 s to
  90 s to match the queue-side watchdog and give the same headroom on both
  dispatch paths. Brief mid-window MQTT disconnects keep polling instead of
  immediately failing — matches what the queue watchdog already does and
  avoids false-failing on transient telemetry gaps.

  11 new tests in test_background_dispatch_watchdog.py: state-change pickup,
  subtask_id-change pickup with state still FINISH, neither-changed timeout
  plus force_reconnect_stale_session call, pre_subtask_id=None backwards-
  compat, post-dispatch subtask_id=None not counting as a change, brief
  disconnect not short-circuiting the window, persistent disconnect for the
  full window returning False, default-timeout=90s contract, _run_reprint_archive
  raises RuntimeError with the captured pre-state args on watchdog False,
  _run_reprint_archive happy path doesn't rollback, _run_active_job marks
  the job failed with the message when _process_job raises RuntimeError.
maziggy 1 month ago
parent
commit
9d0418688c

File diff suppressed because it is too large
+ 0 - 0
CHANGELOG.md


+ 78 - 15
backend/app/services/background_dispatch.py

@@ -684,9 +684,34 @@ class BackgroundDispatchService:
                     )
                     raise RuntimeError("Failed to start print")
 
-                pre_state = getattr(printer_manager.get_status(job.printer_id), "state", None)
+                # Wait for the printer to actually pick up the command before
+                # marking the dispatch job complete (#1042). MQTT-publish success
+                # only proves the command queued locally; the printer can still
+                # reject it (HMS error pending, half-broken session, SD card
+                # missing) and never transition. Until #1042 this watchdog was
+                # fire-and-forget — the job was reported successful and the
+                # user had no signal that the print never started. The uploaded
+                # file is intentionally left on the printer's SD card on
+                # timeout: the next dispatch will overwrite it via the existing
+                # delete-then-upload step, and the printer may still be in the
+                # middle of reading it if it picked up just past the timeout.
+                pre_status = printer_manager.get_status(job.printer_id)
+                pre_state = getattr(pre_status, "state", None) if pre_status else None
+                pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
                 if pre_state:
-                    asyncio.create_task(self._verify_print_response(job.printer_id, printer_name, pre_state))
+                    await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...")
+                    transitioned = await self._verify_print_response(
+                        job.printer_id,
+                        printer_name,
+                        pre_state,
+                        pre_subtask_id=pre_subtask_id,
+                    )
+                    if not transitioned:
+                        raise RuntimeError(
+                            f"Printer did not acknowledge print command — state still {pre_state}. "
+                            f"Check the printer for a pending error (HMS code, plate-clear prompt, "
+                            f"SD card) and try again."
+                        )
 
                 if job.requested_by_user_id and job.requested_by_username:
                     printer_manager.set_current_print_user(
@@ -857,9 +882,28 @@ class BackgroundDispatchService:
                     await db.rollback()
                     raise RuntimeError("Failed to start print")
 
-                pre_state = getattr(printer_manager.get_status(job.printer_id), "state", None)
+                # See _run_reprint_archive for rationale (#1042). On timeout
+                # also rolls back the freshly-created archive so the library
+                # flow doesn't leave behind a phantom row for a print that
+                # never started.
+                pre_status = printer_manager.get_status(job.printer_id)
+                pre_state = getattr(pre_status, "state", None) if pre_status else None
+                pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
                 if pre_state:
-                    asyncio.create_task(self._verify_print_response(job.printer_id, printer_name, pre_state))
+                    await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...")
+                    transitioned = await self._verify_print_response(
+                        job.printer_id,
+                        printer_name,
+                        pre_state,
+                        pre_subtask_id=pre_subtask_id,
+                    )
+                    if not transitioned:
+                        await db.rollback()
+                        raise RuntimeError(
+                            f"Printer did not acknowledge print command — state still {pre_state}. "
+                            f"Check the printer for a pending error (HMS code, plate-clear prompt, "
+                            f"SD card) and try again."
+                        )
 
                 if job.requested_by_user_id and job.requested_by_username:
                     printer_manager.set_current_print_user(
@@ -899,38 +943,57 @@ class BackgroundDispatchService:
         printer_id: int,
         printer_name: str,
         pre_state: str,
-        timeout: float = 15.0,
+        pre_subtask_id: str | None = None,
+        timeout: float = 90.0,
         poll_interval: float = 3.0,
-    ):
-        """Check if the printer responded to a print command.
-
-        Runs as a fire-and-forget background task after start_print() succeeds.
-        If the printer's gcode_state hasn't changed within the timeout, logs a
-        warning for diagnostics (visible in support packages).
+    ) -> bool:
+        """Wait for the printer to acknowledge a print command.
+
+        Returns True if the printer transitioned (state advanced past pre_state
+        or subtask_id advanced past pre_subtask_id). Returns False on timeout —
+        in that case logs a warning and forces an MQTT reconnect, mirroring the
+        queue-side watchdog (`_watchdog_print_start`). Caller is responsible
+        for surfacing the False result to the user (typically by raising so the
+        dispatch job is marked failed).
+
+        Both transition signals are checked because H2D can sit at FINISH for
+        ~50 s after accepting `project_file` before flipping to PREPARE; the
+        printer echoes our per-dispatch identity back as `subtask_id` on
+        `push_status` first, so a subtask_id change is a definitive "command
+        landed" signal even while state is still FINISH (#1078).
         """
         deadline = time.monotonic() + timeout
         while time.monotonic() < deadline:
             await asyncio.sleep(poll_interval)
             state = printer_manager.get_status(printer_id)
             if not state:
-                return  # Printer disconnected
+                # Printer momentarily not reporting — could be a brief MQTT
+                # disconnect mid-window. Keep polling rather than declaring
+                # failure on the first missed tick; the printer may reconnect
+                # within the remaining timeout and still surface a transition.
+                continue
             if state.state != pre_state:
-                return  # Printer responded
+                return True
+            if pre_subtask_id is not None and state.subtask_id is not None and state.subtask_id != pre_subtask_id:
+                return True
         logger.warning(
-            "Printer %s (%d) did not respond to print command within %.0fs (state still %s) — printer may need restart",
+            "Printer %s (%d) did not respond to print command within %.0fs "
+            "(state still %s, subtask_id still %s) — printer may need restart",
             printer_name,
             printer_id,
             timeout,
             pre_state,
+            pre_subtask_id,
         )
         # Strong signal the MQTT session is half-broken (#887, #936): telemetry
         # still arrives but our publishes don't reach the printer. Force a fresh
         # session so the next dispatch can land without a power cycle.
         client = printer_manager.get_client(printer_id)
-        if client:
+        if client and hasattr(client, "force_reconnect_stale_session"):
             client.force_reconnect_stale_session(
                 f"print command unacknowledged after {timeout:.0f}s (state still {pre_state})"
             )
+        return False
 
     @staticmethod
     async def _cleanup_sd_card_file(

+ 521 - 0
backend/tests/unit/services/test_background_dispatch_watchdog.py

@@ -0,0 +1,521 @@
+"""Regression tests for ``BackgroundDispatchService._verify_print_response``.
+
+The background-dispatch watchdog used to be fire-and-forget — it logged a
+warning and force-reconnected MQTT, but the dispatch job had already been
+marked successful. The user therefore saw "Print started successfully" while
+the printer never actually transitioned (#1042 follow-up). The watchdog now
+returns a bool so the caller can fail the dispatch job when the printer
+doesn't acknowledge the command, mirroring what `_watchdog_print_start` does
+on the queue side.
+
+Both transition signals are accepted: ``state`` advancing past ``pre_state``
+*or* ``subtask_id`` advancing past ``pre_subtask_id`` — H2D firmware can sit
+at FINISH for ~50 s after accepting ``project_file`` while echoing the new
+subtask_id back almost immediately (#1078).
+"""
+
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from backend.app.services.background_dispatch import BackgroundDispatchService
+
+
+def _status(state: str, subtask_id: str | None = None):
+    """Minimal stand-in for PrinterState — only the two fields the watchdog reads."""
+    return SimpleNamespace(state=state, subtask_id=subtask_id)
+
+
+class TestReturnsTrueOnPickup:
+    @pytest.mark.asyncio
+    async def test_returns_true_on_state_change(self):
+        get_status = MagicMock(return_value=_status("RUNNING", "OLD_SUBTASK"))
+        with patch(
+            "backend.app.services.background_dispatch.printer_manager.get_status",
+            get_status,
+        ):
+            result = await BackgroundDispatchService._verify_print_response(
+                printer_id=42,
+                printer_name="P1S",
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.3,
+                poll_interval=0.05,
+            )
+
+        assert result is True
+
+    @pytest.mark.asyncio
+    async def test_returns_true_on_subtask_id_change_even_if_state_still_finish(self):
+        """#1078: H2D keeps state=FINISH for ~50 s after accepting project_file
+        but flips subtask_id immediately. Must be accepted as a pickup signal."""
+        get_status = MagicMock(return_value=_status("FINISH", "NEW_SUBTASK_12345"))
+        with patch(
+            "backend.app.services.background_dispatch.printer_manager.get_status",
+            get_status,
+        ):
+            result = await BackgroundDispatchService._verify_print_response(
+                printer_id=42,
+                printer_name="H2D",
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK_99999",
+                timeout=0.3,
+                poll_interval=0.05,
+            )
+
+        assert result is True
+
+
+class TestReturnsFalseOnTimeout:
+    @pytest.mark.asyncio
+    async def test_returns_false_when_neither_state_nor_subtask_id_changes(self):
+        """The exact #1042 scenario: P1S sits in FAILED with HMS pending,
+        accepts the MQTT publish, never transitions. Watchdog must report
+        failure so the caller fails the dispatch job."""
+        get_status = MagicMock(return_value=_status("FINISH", "OLD_SUBTASK"))
+        client = MagicMock()
+        get_client = MagicMock(return_value=client)
+
+        with (
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_status",
+                get_status,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_client",
+                get_client,
+            ),
+        ):
+            result = await BackgroundDispatchService._verify_print_response(
+                printer_id=42,
+                printer_name="P1S",
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        assert result is False
+        client.force_reconnect_stale_session.assert_called_once()
+
+    @pytest.mark.asyncio
+    async def test_returns_false_when_pre_subtask_id_none_and_state_unchanged(self):
+        """Backward-compat: callers without a captured pre_subtask_id (e.g. the
+        printer never reported one) must still get the timeout failure path
+        based on state alone."""
+        get_status = MagicMock(return_value=_status("FINISH", "ANYTHING"))
+        get_client = MagicMock(return_value=None)
+
+        with (
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_status",
+                get_status,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_client",
+                get_client,
+            ),
+        ):
+            result = await BackgroundDispatchService._verify_print_response(
+                printer_id=42,
+                printer_name="P1S",
+                pre_state="FINISH",
+                pre_subtask_id=None,
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        assert result is False
+
+    @pytest.mark.asyncio
+    async def test_subtask_id_none_post_dispatch_does_not_count_as_change(self):
+        """If the printer transiently reports subtask_id=None during the
+        watchdog window (e.g. mid-reconnect), that must not be treated as
+        "advanced past pre_subtask_id" — otherwise we'd false-pass and mark
+        a never-started print as successful."""
+        get_status = MagicMock(return_value=_status("FINISH", None))
+        get_client = MagicMock(return_value=None)
+
+        with (
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_status",
+                get_status,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_client",
+                get_client,
+            ),
+        ):
+            result = await BackgroundDispatchService._verify_print_response(
+                printer_id=42,
+                printer_name="P1S",
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        assert result is False
+
+
+class TestDisconnectHandling:
+    @pytest.mark.asyncio
+    async def test_disconnect_does_not_short_circuit_window(self):
+        """A momentary ``get_status() is None`` (brief MQTT disconnect mid-window)
+        must not immediately fail the dispatch — the printer may reconnect and
+        still produce a valid transition before timeout. Falsely failing on the
+        first missed tick is the previous bug class we're moving away from."""
+        # First call: disconnected. Second call onward: reconnected and transitioned.
+        get_status = MagicMock(side_effect=[None, _status("RUNNING")])
+        with patch(
+            "backend.app.services.background_dispatch.printer_manager.get_status",
+            get_status,
+        ):
+            result = await BackgroundDispatchService._verify_print_response(
+                printer_id=42,
+                printer_name="P1S",
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.3,
+                poll_interval=0.05,
+            )
+
+        assert result is True
+        assert get_status.call_count >= 2
+
+    @pytest.mark.asyncio
+    async def test_disconnect_for_full_window_returns_false(self):
+        """Persistent disconnect for the full window is treated as failure.
+        Better to false-fail and let the user retry than to false-succeed and
+        leave them watching an idle printer (#1042)."""
+        get_status = MagicMock(return_value=None)
+        get_client = MagicMock(return_value=None)
+
+        with (
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_status",
+                get_status,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_client",
+                get_client,
+            ),
+        ):
+            result = await BackgroundDispatchService._verify_print_response(
+                printer_id=42,
+                printer_name="P1S",
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        assert result is False
+
+
+class TestDefaults:
+    def test_default_timeout_matches_queue_watchdog(self):
+        """Queue and background watchdogs need the same 90 s default to give
+        slow H2D FINISH→PREPARE transitions the same headroom on both paths."""
+        import inspect
+
+        sig = inspect.signature(BackgroundDispatchService._verify_print_response)
+        assert sig.parameters["timeout"].default == 90.0
+
+
+# ---------------------------------------------------------------------------
+# Integration tests: the call sites in _run_reprint_archive and
+# _run_print_library_file must (a) await the watchdog instead of fire-and-
+# forget, (b) raise RuntimeError on watchdog False so _run_active_job marks
+# the job failed, (c) rollback the library-file flow's freshly-created
+# archive on timeout. Heavy mocking — the goal is to verify the new wiring,
+# not to re-test the dependencies.
+# ---------------------------------------------------------------------------
+
+from contextlib import asynccontextmanager  # noqa: E402
+from unittest.mock import AsyncMock  # noqa: E402
+
+from backend.app.services.background_dispatch import (  # noqa: E402
+    ActiveDispatchState,
+    PrintDispatchJob,
+)
+
+
+def _make_session_factory(db_mock):
+    """Build an async-session factory whose context manager yields ``db_mock``.
+
+    Mirrors the ``async with async_session() as db`` shape used by both
+    ``_run_*`` methods so the test can intercept ``db.rollback`` / ``db.scalar``.
+    """
+
+    @asynccontextmanager
+    async def _factory():
+        yield db_mock
+
+    return _factory
+
+
+def _printer_namespace():
+    return SimpleNamespace(
+        id=10,
+        name="P1S",
+        ip_address="1.2.3.4",
+        access_code="abc",
+        model="P1S",
+    )
+
+
+def _make_dispatch_job(kind: str = "reprint_archive") -> PrintDispatchJob:
+    return PrintDispatchJob(
+        id=1,
+        kind=kind,
+        source_id=99,
+        source_name="Test.gcode.3mf",
+        printer_id=10,
+        printer_name="P1S",
+        options={},
+        requested_by_user_id=None,
+        requested_by_username=None,
+    )
+
+
+@pytest.fixture
+def reprint_archive_mocks(tmp_path):
+    """Mock harness for ``_run_reprint_archive`` covering every external
+    dependency up to (and including) ``start_print``. The watchdog is left
+    real so the caller can patch ``_verify_print_response`` per-test."""
+    archive_file = tmp_path / "test.3mf"
+    archive_file.write_bytes(b"fake-3mf-content")
+
+    archive = SimpleNamespace(
+        id=99,
+        filename="Test.gcode.3mf",
+        file_path=str(archive_file),
+    )
+
+    db = MagicMock()
+    db.scalar = AsyncMock(return_value=_printer_namespace())
+    db.rollback = AsyncMock()
+
+    archive_service = MagicMock()
+    archive_service.get_archive = AsyncMock(return_value=archive)
+
+    return {
+        "archive": archive,
+        "archive_file": archive_file,
+        "db": db,
+        "archive_service": archive_service,
+        "session_factory": _make_session_factory(db),
+    }
+
+
+@pytest.fixture
+def library_file_mocks(tmp_path):
+    """Mock harness for ``_run_print_library_file`` — separate from the
+    reprint fixture because the library flow creates its archive via
+    ``archive_service.archive_print(...)`` rather than fetching one."""
+    src_file = tmp_path / "lib_src.3mf"
+    src_file.write_bytes(b"fake-3mf-content")
+
+    lib_file = SimpleNamespace(
+        id=22,
+        filename="cube.gcode.3mf",
+        file_path=str(src_file.relative_to(tmp_path)),
+    )
+    lib_file.active = staticmethod(lambda: lib_file)  # mimic LibraryFile.active() chainable
+
+    new_archive = SimpleNamespace(id=500, filename="cube.gcode.3mf", file_path=str(src_file))
+
+    db = MagicMock()
+    db.scalar = AsyncMock()  # configured per-test
+    db.flush = AsyncMock()
+    db.commit = AsyncMock()
+    db.rollback = AsyncMock()
+
+    archive_service = MagicMock()
+    archive_service.archive_print = AsyncMock(return_value=new_archive)
+
+    return {
+        "lib_file": lib_file,
+        "src_file": src_file,
+        "new_archive": new_archive,
+        "db": db,
+        "archive_service": archive_service,
+        "session_factory": _make_session_factory(db),
+    }
+
+
+class TestReprintArchiveDispatchWiring:
+    """Verify ``_run_reprint_archive`` (a) awaits the watchdog inline and
+    (b) raises RuntimeError on False so the dispatch job is marked failed."""
+
+    @pytest.mark.asyncio
+    async def test_raises_runtime_error_when_watchdog_returns_false(self, reprint_archive_mocks):
+        """The exact #1042 propagation gap: watchdog detects non-transition,
+        _run_reprint_archive must surface it as a RuntimeError so the surrounding
+        _run_active_job marks the job failed (instead of silently completing)."""
+        from backend.app.services.background_dispatch import BackgroundDispatchService
+
+        m = reprint_archive_mocks
+        service = BackgroundDispatchService()
+        job = _make_dispatch_job(kind="reprint_archive")
+
+        watchdog = AsyncMock(return_value=False)
+
+        with (
+            patch("backend.app.services.background_dispatch.async_session", m["session_factory"]),
+            patch(
+                "backend.app.services.background_dispatch.ArchiveService",
+                return_value=m["archive_service"],
+            ),
+            patch.object(BackgroundDispatchService, "_verify_print_response", watchdog),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.is_connected",
+                return_value=True,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_status",
+                return_value=SimpleNamespace(state="FINISH", subtask_id="OLD_SUBTASK"),
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.start_print",
+                return_value=True,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.delete_file_async",
+                new_callable=AsyncMock,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.with_ftp_retry",
+                new_callable=AsyncMock,
+                return_value=True,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.get_ftp_retry_settings",
+                new_callable=AsyncMock,
+                return_value=(False, 0, 0, 30.0),
+            ),
+            patch(
+                "backend.app.services.background_dispatch.upload_file_async",
+                new_callable=AsyncMock,
+                return_value=True,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.ws_manager.broadcast",
+                new_callable=AsyncMock,
+            ),
+            patch("backend.app.main.register_expected_print"),
+            pytest.raises(RuntimeError, match="did not acknowledge print command"),
+        ):
+            await service._run_reprint_archive(job)
+
+        # Watchdog received the captured pre-state and pre_subtask_id.
+        watchdog.assert_awaited_once()
+        kwargs = watchdog.await_args.kwargs
+        args = watchdog.await_args.args
+        assert "FINISH" in args  # pre_state
+        assert kwargs["pre_subtask_id"] == "OLD_SUBTASK"
+
+    @pytest.mark.asyncio
+    async def test_succeeds_when_watchdog_returns_true(self, reprint_archive_mocks):
+        """Happy path: watchdog confirms pickup; _run_reprint_archive returns
+        without raising. Guards against the wiring accidentally raising on True."""
+        from backend.app.services.background_dispatch import BackgroundDispatchService
+
+        m = reprint_archive_mocks
+        service = BackgroundDispatchService()
+        job = _make_dispatch_job(kind="reprint_archive")
+
+        with (
+            patch("backend.app.services.background_dispatch.async_session", m["session_factory"]),
+            patch(
+                "backend.app.services.background_dispatch.ArchiveService",
+                return_value=m["archive_service"],
+            ),
+            patch.object(
+                BackgroundDispatchService,
+                "_verify_print_response",
+                AsyncMock(return_value=True),
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.is_connected",
+                return_value=True,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.get_status",
+                return_value=SimpleNamespace(state="FINISH", subtask_id="OLD_SUBTASK"),
+            ),
+            patch(
+                "backend.app.services.background_dispatch.printer_manager.start_print",
+                return_value=True,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.delete_file_async",
+                new_callable=AsyncMock,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.with_ftp_retry",
+                new_callable=AsyncMock,
+                return_value=True,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.get_ftp_retry_settings",
+                new_callable=AsyncMock,
+                return_value=(False, 0, 0, 30.0),
+            ),
+            patch(
+                "backend.app.services.background_dispatch.upload_file_async",
+                new_callable=AsyncMock,
+                return_value=True,
+            ),
+            patch(
+                "backend.app.services.background_dispatch.ws_manager.broadcast",
+                new_callable=AsyncMock,
+            ),
+            patch("backend.app.main.register_expected_print"),
+        ):
+            await service._run_reprint_archive(job)  # must not raise
+
+        # Reprint flow does not touch the existing archive — no rollback expected.
+        m["db"].rollback.assert_not_called()
+
+
+class TestRunActiveJobMarksFailedOnRuntimeError:
+    """End-to-end: a watchdog-driven RuntimeError must reach
+    `_mark_job_finished(failed=True)` via the existing ``_run_active_job``
+    catch-all, so the dispatch UI shows a real failure (not "Done")."""
+
+    @pytest.mark.asyncio
+    async def test_runtime_error_from_process_job_marks_failed_with_message(self):
+        from backend.app.services.background_dispatch import BackgroundDispatchService
+
+        service = BackgroundDispatchService()
+        job = _make_dispatch_job()
+        # Place the job into _active_jobs so _set_active_message has a target.
+        service._active_jobs[job.id] = ActiveDispatchState(job=job, message="")
+
+        failure_message = (
+            "Printer did not acknowledge print command — state still FINISH. "
+            "Check the printer for a pending error (HMS code, plate-clear prompt, "
+            "SD card) and try again."
+        )
+
+        with (
+            patch.object(
+                BackgroundDispatchService,
+                "_process_job",
+                AsyncMock(side_effect=RuntimeError(failure_message)),
+            ),
+            patch.object(
+                BackgroundDispatchService,
+                "_mark_job_finished",
+                new_callable=AsyncMock,
+            ) as mark_finished,
+        ):
+            await service._run_active_job(job)
+
+        mark_finished.assert_awaited_once()
+        kwargs = mark_finished.await_args.kwargs
+        assert kwargs["failed"] is True
+        assert "did not acknowledge print command" in kwargs["message"]

Some files were not shown because too many files changed in this diff