"""Regression tests for ``BackgroundDispatchService._verify_print_response``. The background-dispatch watchdog used to be fire-and-forget — it logged a warning and force-reconnected MQTT, but the dispatch job had already been marked successful. The user therefore saw "Print started successfully" while the printer never actually transitioned (#1042 follow-up). The watchdog now returns a bool so the caller can fail the dispatch job when the printer doesn't acknowledge the command, mirroring what `_watchdog_print_start` does on the queue side. Both transition signals are accepted: ``state`` advancing past ``pre_state`` *or* ``subtask_id`` advancing past ``pre_subtask_id`` — H2D firmware can sit at FINISH for ~50 s after accepting ``project_file`` while echoing the new subtask_id back almost immediately (#1078). """ from types import SimpleNamespace from unittest.mock import MagicMock, patch import pytest from backend.app.services.background_dispatch import BackgroundDispatchService def _status(state: str, subtask_id: str | None = None, gcode_file: str | None = None): """Minimal stand-in for PrinterState — only the fields the watchdog reads.""" return SimpleNamespace(state=state, subtask_id=subtask_id, gcode_file=gcode_file) class TestReturnsTrueOnPickup: @pytest.mark.asyncio async def test_returns_true_on_state_change(self): get_status = MagicMock(return_value=_status("RUNNING", "OLD_SUBTASK")) with patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1S", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", timeout=0.3, poll_interval=0.05, ) assert result is True @pytest.mark.asyncio async def test_returns_true_on_subtask_id_change_even_if_state_still_finish(self): """#1078: H2D keeps state=FINISH for ~50 s after accepting project_file but flips subtask_id immediately. Must be accepted as a pickup signal.""" get_status = MagicMock(return_value=_status("FINISH", "NEW_SUBTASK_12345")) with patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="H2D", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK_99999", timeout=0.3, poll_interval=0.05, ) assert result is True class TestReturnsFalseOnTimeout: @pytest.mark.asyncio async def test_returns_false_when_neither_state_nor_subtask_id_changes(self): """The exact #1042 scenario: P1S sits in FAILED with HMS pending, accepts the MQTT publish, never transitions. Watchdog must report failure so the caller fails the dispatch job.""" get_status = MagicMock(return_value=_status("FINISH", "OLD_SUBTASK")) client = MagicMock() get_client = MagicMock(return_value=client) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1S", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", timeout=0.2, poll_interval=0.05, ) assert result is False client.force_reconnect_stale_session.assert_called_once() @pytest.mark.asyncio async def test_returns_false_on_finish_to_idle_user_dismissed_prompt(self): """Regression for #1370 in the direct-dispatch path: when pre_state is FINISH and the printer transitions to IDLE during the verifier window, that's the user dismissing a post-print prompt — NOT acceptance of our project_file. The original ``state != pre_state`` check incorrectly returned True on this transition, so the dispatch job was marked successful even though no print was running. Must now report failure so the caller raises RuntimeError and the user sees the actual error. """ get_status = MagicMock(return_value=_status("IDLE", "OLD_SUBTASK")) client = MagicMock() get_client = MagicMock(return_value=client) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1S", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", timeout=0.2, poll_interval=0.05, ) assert result is False, ( "FINISH -> IDLE is the user dismissing a screen prompt, not the " "printer accepting project_file — verifier must report failure (#1370)" ) @pytest.mark.asyncio async def test_returns_true_on_each_active_print_state(self): """Counterpart to the #1370 fix: transitions into the active-print state set ARE valid "command landed" signals. PREPARE / SLICING / RUNNING / PAUSE all return True. """ for active_state in ("PREPARE", "SLICING", "RUNNING", "PAUSE"): get_status = MagicMock(return_value=_status(active_state, "OLD_SUBTASK")) with patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1S", pre_state="IDLE", pre_subtask_id="OLD_SUBTASK", timeout=0.2, poll_interval=0.05, ) assert result is True, ( f"transition IDLE -> {active_state} must be treated as a valid 'command landed' signal" ) @pytest.mark.asyncio async def test_returns_false_when_pre_subtask_id_none_and_state_unchanged(self): """Backward-compat: callers without a captured pre_subtask_id (e.g. the printer never reported one) must still get the timeout failure path based on state alone.""" get_status = MagicMock(return_value=_status("FINISH", "ANYTHING")) get_client = MagicMock(return_value=None) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1S", pre_state="FINISH", pre_subtask_id=None, timeout=0.2, poll_interval=0.05, ) assert result is False @pytest.mark.asyncio async def test_subtask_id_none_post_dispatch_does_not_count_as_change(self): """If the printer transiently reports subtask_id=None during the watchdog window (e.g. mid-reconnect), that must not be treated as "advanced past pre_subtask_id" — otherwise we'd false-pass and mark a never-started print as successful.""" get_status = MagicMock(return_value=_status("FINISH", None)) get_client = MagicMock(return_value=None) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1S", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", timeout=0.2, poll_interval=0.05, ) assert result is False class TestDisconnectHandling: @pytest.mark.asyncio async def test_disconnect_does_not_short_circuit_window(self): """A momentary ``get_status() is None`` (brief MQTT disconnect mid-window) must not immediately fail the dispatch — the printer may reconnect and still produce a valid transition before timeout. Falsely failing on the first missed tick is the previous bug class we're moving away from.""" # First call: disconnected. Second call onward: reconnected and transitioned. get_status = MagicMock(side_effect=[None, _status("RUNNING")]) with patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1S", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", timeout=0.3, poll_interval=0.05, ) assert result is True assert get_status.call_count >= 2 @pytest.mark.asyncio async def test_disconnect_for_full_window_returns_false(self): """Persistent disconnect for the full window is treated as failure. Better to false-fail and let the user retry than to false-succeed and leave them watching an idle printer (#1042).""" get_status = MagicMock(return_value=None) get_client = MagicMock(return_value=None) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1S", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", timeout=0.2, poll_interval=0.05, ) assert result is False class TestDefaults: def test_default_timeout_matches_queue_watchdog(self): """Queue and background watchdogs need the same 90 s default to give slow H2D FINISH→PREPARE transitions the same headroom on both paths.""" import inspect sig = inspect.signature(BackgroundDispatchService._verify_print_response) assert sig.parameters["timeout"].default == 90.0 class TestGcodeFileDiscriminator: """#1150 vs #887/#936 discriminator: skip the forced reconnect when the printer's gcode_file changed since pre-dispatch (project_file landed, printer is parsing slowly — reconnecting mid-parse causes 0500_4003). Reconnect when gcode_file is unchanged (publish was silently swallowed — half-broken session needs the original recovery).""" @pytest.mark.asyncio async def test_skips_reconnect_when_gcode_file_changed(self): get_status = MagicMock( return_value=_status("FINISH", "OLD_SUBTASK", gcode_file="/new.3mf"), ) client = MagicMock() get_client = MagicMock(return_value=client) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): result = await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1P", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", pre_gcode_file="/old.3mf", timeout=0.2, poll_interval=0.05, ) assert result is False client.force_reconnect_stale_session.assert_not_called() @pytest.mark.asyncio async def test_reconnects_when_gcode_file_unchanged(self): # The half-broken-session case (#887/#936): publish was dropped, so # the printer is still showing the previous file. Reconnect to clear # the broken paho QoS-1 queue. get_status = MagicMock( return_value=_status("FINISH", "OLD_SUBTASK", gcode_file="/old.3mf"), ) client = MagicMock() get_client = MagicMock(return_value=client) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1P", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", pre_gcode_file="/old.3mf", timeout=0.2, poll_interval=0.05, ) client.force_reconnect_stale_session.assert_called_once() @pytest.mark.asyncio async def test_skips_reconnect_when_pre_gcode_file_was_none(self): # Printer just connected (pre_gcode_file=None) and now reports a # file — that's a clear "command landed" signal too. get_status = MagicMock( return_value=_status("FINISH", "OLD_SUBTASK", gcode_file="/new.3mf"), ) client = MagicMock() get_client = MagicMock(return_value=client) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1P", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", pre_gcode_file=None, timeout=0.2, poll_interval=0.05, ) client.force_reconnect_stale_session.assert_not_called() @pytest.mark.asyncio async def test_reconnects_when_no_pre_gcode_file_arg_supplied(self): # Backward-compat: callers that don't pass pre_gcode_file at all # (everything but our updated dispatch sites) must still get the # original reconnect-on-timeout behaviour. Here pre_gcode_file # defaults to None and the printer's current gcode_file is also # None → publish_landed=False → reconnect. get_status = MagicMock( return_value=_status("FINISH", "OLD_SUBTASK", gcode_file=None), ) client = MagicMock() get_client = MagicMock(return_value=client) with ( patch( "backend.app.services.background_dispatch.printer_manager.get_status", get_status, ), patch( "backend.app.services.background_dispatch.printer_manager.get_client", get_client, ), ): await BackgroundDispatchService._verify_print_response( printer_id=42, printer_name="P1P", pre_state="FINISH", pre_subtask_id="OLD_SUBTASK", timeout=0.2, poll_interval=0.05, ) client.force_reconnect_stale_session.assert_called_once() # --------------------------------------------------------------------------- # Integration tests: the call sites in _run_reprint_archive and # _run_print_library_file must (a) await the watchdog instead of fire-and- # forget, (b) raise RuntimeError on watchdog False so _run_active_job marks # the job failed, (c) rollback the library-file flow's freshly-created # archive on timeout. Heavy mocking — the goal is to verify the new wiring, # not to re-test the dependencies. # --------------------------------------------------------------------------- from contextlib import asynccontextmanager # noqa: E402 from unittest.mock import AsyncMock # noqa: E402 from backend.app.services.background_dispatch import ( # noqa: E402 ActiveDispatchState, PrintDispatchJob, ) def _make_session_factory(db_mock): """Build an async-session factory whose context manager yields ``db_mock``. Mirrors the ``async with async_session() as db`` shape used by both ``_run_*`` methods so the test can intercept ``db.rollback`` / ``db.scalar``. """ @asynccontextmanager async def _factory(): yield db_mock return _factory def _printer_namespace(): return SimpleNamespace( id=10, name="P1S", ip_address="1.2.3.4", access_code="abc", model="P1S", ) def _make_dispatch_job(kind: str = "reprint_archive") -> PrintDispatchJob: return PrintDispatchJob( id=1, kind=kind, source_id=99, source_name="Test.gcode.3mf", printer_id=10, printer_name="P1S", options={}, requested_by_user_id=None, requested_by_username=None, ) @pytest.fixture def reprint_archive_mocks(tmp_path): """Mock harness for ``_run_reprint_archive`` covering every external dependency up to (and including) ``start_print``. The watchdog is left real so the caller can patch ``_verify_print_response`` per-test.""" archive_file = tmp_path / "test.3mf" archive_file.write_bytes(b"fake-3mf-content") archive = SimpleNamespace( id=99, filename="Test.gcode.3mf", file_path=str(archive_file), ) db = MagicMock() db.scalar = AsyncMock(return_value=_printer_namespace()) db.rollback = AsyncMock() archive_service = MagicMock() archive_service.get_archive = AsyncMock(return_value=archive) return { "archive": archive, "archive_file": archive_file, "db": db, "archive_service": archive_service, "session_factory": _make_session_factory(db), } @pytest.fixture def library_file_mocks(tmp_path): """Mock harness for ``_run_print_library_file`` — separate from the reprint fixture because the library flow creates its archive via ``archive_service.archive_print(...)`` rather than fetching one.""" src_file = tmp_path / "lib_src.3mf" src_file.write_bytes(b"fake-3mf-content") lib_file = SimpleNamespace( id=22, filename="cube.gcode.3mf", file_path=str(src_file.relative_to(tmp_path)), ) lib_file.active = staticmethod(lambda: lib_file) # mimic LibraryFile.active() chainable new_archive = SimpleNamespace(id=500, filename="cube.gcode.3mf", file_path=str(src_file)) db = MagicMock() db.scalar = AsyncMock() # configured per-test db.flush = AsyncMock() db.commit = AsyncMock() db.rollback = AsyncMock() archive_service = MagicMock() archive_service.archive_print = AsyncMock(return_value=new_archive) return { "lib_file": lib_file, "src_file": src_file, "new_archive": new_archive, "db": db, "archive_service": archive_service, "session_factory": _make_session_factory(db), } class TestReprintArchiveDispatchWiring: """Verify ``_run_reprint_archive`` (a) awaits the watchdog inline and (b) raises RuntimeError on False so the dispatch job is marked failed.""" @pytest.mark.asyncio async def test_raises_runtime_error_when_watchdog_returns_false(self, reprint_archive_mocks): """The exact #1042 propagation gap: watchdog detects non-transition, _run_reprint_archive must surface it as a RuntimeError so the surrounding _run_active_job marks the job failed (instead of silently completing).""" from backend.app.services.background_dispatch import BackgroundDispatchService m = reprint_archive_mocks service = BackgroundDispatchService() job = _make_dispatch_job(kind="reprint_archive") watchdog = AsyncMock(return_value=False) with ( patch("backend.app.services.background_dispatch.async_session", m["session_factory"]), patch( "backend.app.services.background_dispatch.ArchiveService", return_value=m["archive_service"], ), patch.object(BackgroundDispatchService, "_verify_print_response", watchdog), patch( "backend.app.services.background_dispatch.printer_manager.is_connected", return_value=True, ), patch( "backend.app.services.background_dispatch.printer_manager.get_status", return_value=SimpleNamespace(state="FINISH", subtask_id="OLD_SUBTASK"), ), patch( "backend.app.services.background_dispatch.printer_manager.start_print", return_value=True, ), patch( "backend.app.services.background_dispatch.delete_file_async", new_callable=AsyncMock, ), patch( "backend.app.services.background_dispatch.with_ftp_retry", new_callable=AsyncMock, return_value=True, ), patch( "backend.app.services.background_dispatch.get_ftp_retry_settings", new_callable=AsyncMock, return_value=(False, 0, 0, 30.0), ), patch( "backend.app.services.background_dispatch.upload_file_async", new_callable=AsyncMock, return_value=True, ), patch( "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock, ), patch("backend.app.main.register_expected_print"), pytest.raises(RuntimeError, match="did not acknowledge print command"), ): await service._run_reprint_archive(job) # Watchdog received the captured pre-state and pre_subtask_id. watchdog.assert_awaited_once() kwargs = watchdog.await_args.kwargs args = watchdog.await_args.args assert "FINISH" in args # pre_state assert kwargs["pre_subtask_id"] == "OLD_SUBTASK" @pytest.mark.asyncio async def test_succeeds_when_watchdog_returns_true(self, reprint_archive_mocks): """Happy path: watchdog confirms pickup; _run_reprint_archive returns without raising. Guards against the wiring accidentally raising on True.""" from backend.app.services.background_dispatch import BackgroundDispatchService m = reprint_archive_mocks service = BackgroundDispatchService() job = _make_dispatch_job(kind="reprint_archive") with ( patch("backend.app.services.background_dispatch.async_session", m["session_factory"]), patch( "backend.app.services.background_dispatch.ArchiveService", return_value=m["archive_service"], ), patch.object( BackgroundDispatchService, "_verify_print_response", AsyncMock(return_value=True), ), patch( "backend.app.services.background_dispatch.printer_manager.is_connected", return_value=True, ), patch( "backend.app.services.background_dispatch.printer_manager.get_status", return_value=SimpleNamespace(state="FINISH", subtask_id="OLD_SUBTASK"), ), patch( "backend.app.services.background_dispatch.printer_manager.start_print", return_value=True, ), patch( "backend.app.services.background_dispatch.delete_file_async", new_callable=AsyncMock, ), patch( "backend.app.services.background_dispatch.with_ftp_retry", new_callable=AsyncMock, return_value=True, ), patch( "backend.app.services.background_dispatch.get_ftp_retry_settings", new_callable=AsyncMock, return_value=(False, 0, 0, 30.0), ), patch( "backend.app.services.background_dispatch.upload_file_async", new_callable=AsyncMock, return_value=True, ), patch( "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock, ), patch("backend.app.main.register_expected_print"), ): await service._run_reprint_archive(job) # must not raise # Reprint flow does not touch the existing archive — no rollback expected. m["db"].rollback.assert_not_called() class TestRunActiveJobMarksFailedOnRuntimeError: """End-to-end: a watchdog-driven RuntimeError must reach `_mark_job_finished(failed=True)` via the existing ``_run_active_job`` catch-all, so the dispatch UI shows a real failure (not "Done").""" @pytest.mark.asyncio async def test_runtime_error_from_process_job_marks_failed_with_message(self): from backend.app.services.background_dispatch import BackgroundDispatchService service = BackgroundDispatchService() job = _make_dispatch_job() # Place the job into _active_jobs so _set_active_message has a target. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="") failure_message = ( "Printer did not acknowledge print command — state still FINISH. " "Check the printer for a pending error (HMS code, plate-clear prompt, " "SD card) and try again." ) with ( patch.object( BackgroundDispatchService, "_process_job", AsyncMock(side_effect=RuntimeError(failure_message)), ), patch.object( BackgroundDispatchService, "_mark_job_finished", new_callable=AsyncMock, ) as mark_finished, ): await service._run_active_job(job) mark_finished.assert_awaited_once() kwargs = mark_finished.await_args.kwargs assert kwargs["failed"] is True assert "did not acknowledge print command" in kwargs["message"]