| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- """Unit tests for background dispatch service."""
- from types import SimpleNamespace
- from unittest.mock import AsyncMock, patch
- import pytest
- from backend.app.services.background_dispatch import (
- ActiveDispatchState,
- BackgroundDispatchService,
- DispatchEnqueueRejected,
- PrintDispatchJob,
- )
- @pytest.mark.asyncio
- async def test_dispatch_rejects_when_printer_busy_printing():
- """Reject enqueue when target printer is already printing."""
- service = BackgroundDispatchService()
- with (
- patch(
- "backend.app.services.background_dispatch.printer_manager.get_status",
- return_value=SimpleNamespace(state="RUNNING", gcode_file="active.gcode.3mf"),
- ),
- pytest.raises(DispatchEnqueueRejected, match="currently busy printing"),
- ):
- await service.dispatch_reprint_archive(
- archive_id=1,
- archive_name="Test Archive",
- printer_id=10,
- printer_name="Printer A",
- options={},
- requested_by_user_id=None,
- requested_by_username=None,
- )
- @pytest.mark.asyncio
- async def test_dispatch_enqueues_job_and_broadcasts_state():
- """Enqueue succeeds and emits websocket queue update."""
- service = BackgroundDispatchService()
- with (
- patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
- patch(
- "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
- ) as mock_broadcast,
- ):
- result = await service.dispatch_print_library_file(
- file_id=22,
- filename="cube.gcode.3mf",
- printer_id=7,
- printer_name="Printer B",
- options={"plate_id": 2},
- requested_by_user_id=5,
- requested_by_username="tester",
- )
- assert result["status"] == "dispatched"
- assert result["dispatch_job_id"] == 1
- assert result["dispatch_position"] == 1
- assert len(service._queued_jobs) == 1
- mock_broadcast.assert_awaited_once()
- payload = mock_broadcast.await_args.args[0]
- assert payload["type"] == "background_dispatch"
- assert payload["data"]["recent_event"]["status"] == "dispatched"
- @pytest.mark.asyncio
- async def test_cancel_queued_job_removes_it_and_broadcasts():
- """Cancelling queued job removes it immediately."""
- service = BackgroundDispatchService()
- with (
- patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
- patch(
- "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
- ) as mock_broadcast,
- ):
- result = await service.dispatch_reprint_archive(
- archive_id=1,
- archive_name="benchy.gcode.3mf",
- printer_id=1,
- printer_name="Printer 1",
- options={},
- requested_by_user_id=None,
- requested_by_username=None,
- )
- mock_broadcast.reset_mock()
- cancel_result = await service.cancel_job(result["dispatch_job_id"])
- assert cancel_result["cancelled"] is True
- assert cancel_result["pending"] is False
- assert len(service._queued_jobs) == 0
- assert service._batch_total == 0
- mock_broadcast.assert_awaited_once()
- payload = mock_broadcast.await_args.args[0]
- assert payload["data"]["recent_event"]["status"] == "cancelled"
- @pytest.mark.asyncio
- async def test_cancel_active_job_marks_pending_and_sets_cancel_flag():
- """Cancelling active job marks it as pending cancellation."""
- service = BackgroundDispatchService()
- job = PrintDispatchJob(
- id=42,
- kind="reprint_archive",
- source_id=100,
- source_name="gearbox.gcode.3mf",
- printer_id=3,
- printer_name="Printer C",
- )
- service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Uploading...")
- with patch(
- "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
- ) as mock_broadcast:
- result = await service.cancel_job(job.id)
- assert result["cancelled"] is True
- assert result["pending"] is True
- assert job.id in service._cancel_requested_job_ids
- mock_broadcast.assert_awaited_once()
- payload = mock_broadcast.await_args.args[0]
- assert payload["data"]["recent_event"]["status"] == "cancelling"
- def test_resolve_plate_id_uses_request_value_when_provided(tmp_path):
- """Explicit plate_id wins over auto-detection."""
- file_path = tmp_path / "dummy.3mf"
- file_path.write_text("not-a-zip")
- plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=9)
- assert plate_id == 9
- def test_resolve_plate_id_auto_detects_from_3mf(tmp_path):
- """Auto-detect plate from Metadata/plate_X.gcode entry."""
- import zipfile
- file_path = tmp_path / "multi.3mf"
- with zipfile.ZipFile(file_path, "w") as zf:
- zf.writestr("Metadata/plate_7.gcode", b"G1 X0 Y0")
- plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=None)
- assert plate_id == 7
- def test_is_sliced_file_recognizes_supported_extensions():
- """Only .gcode and .gcode.3mf should be accepted."""
- assert BackgroundDispatchService._is_sliced_file("part.gcode") is True
- assert BackgroundDispatchService._is_sliced_file("part.gcode.3mf") is True
- assert BackgroundDispatchService._is_sliced_file("part.3mf") is False
|