| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- """Unit tests for background dispatch service."""
- from types import SimpleNamespace
- from unittest.mock import AsyncMock, patch
- import pytest
- from backend.app.services.background_dispatch import (
- ActiveDispatchState,
- BackgroundDispatchService,
- DispatchEnqueueRejected,
- PrintDispatchJob,
- )
- @pytest.mark.asyncio
- async def test_dispatch_rejects_when_printer_busy_printing():
- """Reject enqueue when target printer is already printing."""
- service = BackgroundDispatchService()
- with (
- patch(
- "backend.app.services.background_dispatch.printer_manager.get_status",
- return_value=SimpleNamespace(state="RUNNING", gcode_file="active.gcode.3mf"),
- ),
- pytest.raises(DispatchEnqueueRejected, match="currently busy printing"),
- ):
- await service.dispatch_reprint_archive(
- archive_id=1,
- archive_name="Test Archive",
- printer_id=10,
- printer_name="Printer A",
- options={},
- requested_by_user_id=None,
- requested_by_username=None,
- )
- @pytest.mark.asyncio
- async def test_dispatch_enqueues_job_and_broadcasts_state():
- """Enqueue succeeds and emits websocket queue update."""
- service = BackgroundDispatchService()
- with (
- patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
- patch(
- "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
- ) as mock_broadcast,
- ):
- result = await service.dispatch_print_library_file(
- file_id=22,
- filename="cube.gcode.3mf",
- printer_id=7,
- printer_name="Printer B",
- options={"plate_id": 2},
- requested_by_user_id=5,
- requested_by_username="tester",
- )
- assert result["status"] == "dispatched"
- assert result["dispatch_job_id"] == 1
- assert result["dispatch_position"] == 1
- assert len(service._queued_jobs) == 1
- mock_broadcast.assert_awaited_once()
- payload = mock_broadcast.await_args.args[0]
- assert payload["type"] == "background_dispatch"
- assert payload["data"]["recent_event"]["status"] == "dispatched"
- @pytest.mark.asyncio
- async def test_dispatch_library_file_defaults_cleanup_flag_false():
- """cleanup_library_after_dispatch defaults to False when not passed — protects
- File Manager / Project Detail / queued-library-file paths from surprise deletion."""
- service = BackgroundDispatchService()
- with (
- patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
- patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock),
- ):
- await service.dispatch_print_library_file(
- file_id=1,
- filename="cube.gcode.3mf",
- printer_id=1,
- printer_name="Printer A",
- options={},
- requested_by_user_id=None,
- requested_by_username=None,
- )
- assert len(service._queued_jobs) == 1
- assert service._queued_jobs[0].cleanup_library_after_dispatch is False
- @pytest.mark.asyncio
- async def test_dispatch_library_file_propagates_cleanup_flag_true():
- """cleanup_library_after_dispatch=True arrives on the queued job so the runner
- can delete the transient LibraryFile after the print is accepted by the printer."""
- service = BackgroundDispatchService()
- with (
- patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
- patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock),
- ):
- await service.dispatch_print_library_file(
- file_id=1,
- filename="cube.gcode.3mf",
- printer_id=1,
- printer_name="Printer A",
- options={},
- requested_by_user_id=42,
- requested_by_username="alice",
- cleanup_library_after_dispatch=True,
- )
- assert len(service._queued_jobs) == 1
- job = service._queued_jobs[0]
- assert job.cleanup_library_after_dispatch is True
- # Sanity: other fields still wired correctly
- assert job.requested_by_user_id == 42
- assert job.requested_by_username == "alice"
- assert job.kind == "print_library_file"
- @pytest.mark.asyncio
- async def test_cancel_queued_job_removes_it_and_broadcasts():
- """Cancelling queued job removes it immediately."""
- service = BackgroundDispatchService()
- with (
- patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
- patch(
- "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
- ) as mock_broadcast,
- ):
- result = await service.dispatch_reprint_archive(
- archive_id=1,
- archive_name="benchy.gcode.3mf",
- printer_id=1,
- printer_name="Printer 1",
- options={},
- requested_by_user_id=None,
- requested_by_username=None,
- )
- mock_broadcast.reset_mock()
- cancel_result = await service.cancel_job(result["dispatch_job_id"])
- assert cancel_result["cancelled"] is True
- assert cancel_result["pending"] is False
- assert len(service._queued_jobs) == 0
- assert service._batch_total == 0
- mock_broadcast.assert_awaited_once()
- payload = mock_broadcast.await_args.args[0]
- assert payload["data"]["recent_event"]["status"] == "cancelled"
- @pytest.mark.asyncio
- async def test_cancel_active_job_marks_pending_and_sets_cancel_flag():
- """Cancelling active job marks it as pending cancellation."""
- service = BackgroundDispatchService()
- job = PrintDispatchJob(
- id=42,
- kind="reprint_archive",
- source_id=100,
- source_name="gearbox.gcode.3mf",
- printer_id=3,
- printer_name="Printer C",
- )
- service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Uploading...")
- with patch(
- "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
- ) as mock_broadcast:
- result = await service.cancel_job(job.id)
- assert result["cancelled"] is True
- assert result["pending"] is True
- assert job.id in service._cancel_requested_job_ids
- mock_broadcast.assert_awaited_once()
- payload = mock_broadcast.await_args.args[0]
- assert payload["data"]["recent_event"]["status"] == "cancelling"
- def test_resolve_plate_id_uses_request_value_when_provided(tmp_path):
- """Explicit plate_id wins over auto-detection."""
- file_path = tmp_path / "dummy.3mf"
- file_path.write_text("not-a-zip")
- plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=9)
- assert plate_id == 9
- def test_resolve_plate_id_auto_detects_from_3mf(tmp_path):
- """Auto-detect plate from Metadata/plate_X.gcode entry."""
- import zipfile
- file_path = tmp_path / "multi.3mf"
- with zipfile.ZipFile(file_path, "w") as zf:
- zf.writestr("Metadata/plate_7.gcode", b"G1 X0 Y0")
- plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=None)
- assert plate_id == 7
- def test_is_sliced_file_recognizes_supported_extensions():
- """Only .gcode and .gcode.3mf should be accepted."""
- assert BackgroundDispatchService._is_sliced_file("part.gcode") is True
- assert BackgroundDispatchService._is_sliced_file("part.gcode.3mf") is True
- assert BackgroundDispatchService._is_sliced_file("part.3mf") is False
- def test_dispatch_option_defaults_align_with_request_schema_defaults():
- """The `job.options.get("<field>", <default>)` calls in the dispatch
- loop must use the same default as the Pydantic request schema. If a
- field is missing from options (e.g. an internal caller bypassing the
- schema), the resulting print command must match what a fresh
- `ReprintRequest()` / `FilePrintRequest()` would produce — anything
- else means certain fields silently flip depending on which entry
- point queued the job.
- Earlier `vibration_cali` had a False default in the dispatch loop
- against a True schema default, latent only because every existing
- caller always sent the field.
- """
- import inspect
- from backend.app.schemas.archive import ReprintRequest
- from backend.app.schemas.library import FilePrintRequest
- from backend.app.services import background_dispatch as bd
- fields = ("bed_levelling", "flow_cali", "vibration_cali", "layer_inspect", "timelapse", "use_ams")
- reprint_defaults = {f: getattr(ReprintRequest(), f) for f in fields}
- libprint_defaults = {f: getattr(FilePrintRequest(), f) for f in fields}
- assert reprint_defaults == libprint_defaults, (
- "ReprintRequest and FilePrintRequest must share the same defaults for these fields"
- )
- src = inspect.getsource(bd)
- for field, expected_default in reprint_defaults.items():
- literal = "True" if expected_default else "False"
- needle = f'{field}=job.options.get("{field}", {literal})'
- count = src.count(needle)
- assert count == 2, (
- f"Expected exactly 2 occurrences of `{needle}` in background_dispatch (one per "
- f"`_process_job` branch). Found {count}. A drift between the request schema's "
- f"default for `{field}` and the dispatch loop's `.get()` default means callers "
- f"that bypass the schema will get inconsistent behaviour."
- )
- @pytest.mark.asyncio
- async def test_cancel_job_not_found_returns_false():
- """Cancelling a nonexistent job returns not_found."""
- service = BackgroundDispatchService()
- with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
- result = await service.cancel_job(999)
- assert result["cancelled"] is False
- assert result["reason"] == "not_found"
- @pytest.mark.asyncio
- async def test_cancel_job_single_lock_covers_both_active_and_queued():
- """cancel_job checks both active and queued jobs under a single lock acquisition.
- Regression test for TOCTOU race: previously two separate lock acquisitions allowed
- the dispatcher loop to move a job from queue to active between them, causing cancel
- to find it in neither place.
- """
- service = BackgroundDispatchService()
- # Set up a job in the queue AND an active job for a different printer
- active_job = PrintDispatchJob(
- id=1,
- kind="reprint_archive",
- source_id=10,
- source_name="active.3mf",
- printer_id=1,
- printer_name="Printer 1",
- )
- service._active_jobs[active_job.id] = ActiveDispatchState(job=active_job, message="Uploading...")
- queued_job = PrintDispatchJob(
- id=2,
- kind="reprint_archive",
- source_id=20,
- source_name="queued.3mf",
- printer_id=2,
- printer_name="Printer 2",
- )
- service._queued_jobs.append(queued_job)
- service._batch_total = 2
- with patch(
- "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
- ) as mock_broadcast:
- # Cancel the queued job — should find it in single lock acquisition
- result = await service.cancel_job(2)
- assert result["cancelled"] is True
- assert result["pending"] is False
- assert len(service._queued_jobs) == 0
- # Active job should be untouched
- assert 1 in service._active_jobs
- mock_broadcast.assert_awaited_once()
- payload = mock_broadcast.await_args.args[0]
- assert payload["data"]["recent_event"]["status"] == "cancelled"
- @pytest.mark.asyncio
- async def test_mark_job_finished_resets_batch_when_all_done():
- """Batch counters reset after last job completes."""
- service = BackgroundDispatchService()
- job = PrintDispatchJob(
- id=1,
- kind="reprint_archive",
- source_id=10,
- source_name="test.3mf",
- printer_id=1,
- printer_name="Printer 1",
- )
- service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
- service._batch_total = 1
- with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
- await service._mark_job_finished(job, failed=False, message="Complete")
- assert service._batch_total == 0
- assert service._batch_completed == 0
- assert service._batch_failed == 0
- @pytest.mark.asyncio
- async def test_mark_job_finished_no_reset_when_jobs_remain():
- """Batch counters NOT reset when queued jobs remain."""
- service = BackgroundDispatchService()
- job = PrintDispatchJob(
- id=1,
- kind="reprint_archive",
- source_id=10,
- source_name="test.3mf",
- printer_id=1,
- printer_name="Printer 1",
- )
- remaining_job = PrintDispatchJob(
- id=2,
- kind="reprint_archive",
- source_id=20,
- source_name="next.3mf",
- printer_id=2,
- printer_name="Printer 2",
- )
- service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
- service._queued_jobs.append(remaining_job)
- service._batch_total = 2
- with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
- await service._mark_job_finished(job, failed=False, message="Complete")
- # Batch counters should NOT be reset — remaining job still queued
- assert service._batch_total == 2
- assert service._batch_completed == 1
- @pytest.mark.asyncio
- async def test_mark_job_finished_batch_reset_rechecks_under_lock():
- """Batch reset re-checks condition inside second lock acquisition.
- Regression test for TOCTOU: a new dispatch between the two lock acquisitions
- could get its counters zeroed if the re-check is missing.
- """
- service = BackgroundDispatchService()
- job = PrintDispatchJob(
- id=1,
- kind="reprint_archive",
- source_id=10,
- source_name="test.3mf",
- printer_id=1,
- printer_name="Printer 1",
- )
- service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
- service._batch_total = 1
- original_broadcast = AsyncMock()
- async def inject_new_job_during_broadcast(msg):
- """Simulate a new dispatch arriving between the two lock acquisitions."""
- await original_broadcast(msg)
- # After broadcast (lock released), inject a new job before reset re-check
- if not service._queued_jobs:
- new_job = PrintDispatchJob(
- id=99,
- kind="reprint_archive",
- source_id=99,
- source_name="injected.3mf",
- printer_id=5,
- printer_name="Printer 5",
- )
- service._queued_jobs.append(new_job)
- service._batch_total = 1
- with patch(
- "backend.app.services.background_dispatch.ws_manager.broadcast",
- side_effect=inject_new_job_during_broadcast,
- ):
- await service._mark_job_finished(job, failed=False, message="Complete")
- # Re-check should prevent reset since a new job appeared
- assert service._batch_total == 1
- assert len(service._queued_jobs) == 1
|