test_background_dispatch.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. """Unit tests for background dispatch service."""
  2. from types import SimpleNamespace
  3. from unittest.mock import AsyncMock, patch
  4. import pytest
  5. from backend.app.services.background_dispatch import (
  6. ActiveDispatchState,
  7. BackgroundDispatchService,
  8. DispatchEnqueueRejected,
  9. PrintDispatchJob,
  10. )
  11. @pytest.mark.asyncio
  12. async def test_dispatch_rejects_when_printer_busy_printing():
  13. """Reject enqueue when target printer is already printing."""
  14. service = BackgroundDispatchService()
  15. with (
  16. patch(
  17. "backend.app.services.background_dispatch.printer_manager.get_status",
  18. return_value=SimpleNamespace(state="RUNNING", gcode_file="active.gcode.3mf"),
  19. ),
  20. pytest.raises(DispatchEnqueueRejected, match="currently busy printing"),
  21. ):
  22. await service.dispatch_reprint_archive(
  23. archive_id=1,
  24. archive_name="Test Archive",
  25. printer_id=10,
  26. printer_name="Printer A",
  27. options={},
  28. requested_by_user_id=None,
  29. requested_by_username=None,
  30. )
  31. @pytest.mark.asyncio
  32. async def test_dispatch_enqueues_job_and_broadcasts_state():
  33. """Enqueue succeeds and emits websocket queue update."""
  34. service = BackgroundDispatchService()
  35. with (
  36. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  37. patch(
  38. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  39. ) as mock_broadcast,
  40. ):
  41. result = await service.dispatch_print_library_file(
  42. file_id=22,
  43. filename="cube.gcode.3mf",
  44. printer_id=7,
  45. printer_name="Printer B",
  46. options={"plate_id": 2},
  47. requested_by_user_id=5,
  48. requested_by_username="tester",
  49. )
  50. assert result["status"] == "dispatched"
  51. assert result["dispatch_job_id"] == 1
  52. assert result["dispatch_position"] == 1
  53. assert len(service._queued_jobs) == 1
  54. mock_broadcast.assert_awaited_once()
  55. payload = mock_broadcast.await_args.args[0]
  56. assert payload["type"] == "background_dispatch"
  57. assert payload["data"]["recent_event"]["status"] == "dispatched"
  58. @pytest.mark.asyncio
  59. async def test_cancel_queued_job_removes_it_and_broadcasts():
  60. """Cancelling queued job removes it immediately."""
  61. service = BackgroundDispatchService()
  62. with (
  63. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  64. patch(
  65. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  66. ) as mock_broadcast,
  67. ):
  68. result = await service.dispatch_reprint_archive(
  69. archive_id=1,
  70. archive_name="benchy.gcode.3mf",
  71. printer_id=1,
  72. printer_name="Printer 1",
  73. options={},
  74. requested_by_user_id=None,
  75. requested_by_username=None,
  76. )
  77. mock_broadcast.reset_mock()
  78. cancel_result = await service.cancel_job(result["dispatch_job_id"])
  79. assert cancel_result["cancelled"] is True
  80. assert cancel_result["pending"] is False
  81. assert len(service._queued_jobs) == 0
  82. assert service._batch_total == 0
  83. mock_broadcast.assert_awaited_once()
  84. payload = mock_broadcast.await_args.args[0]
  85. assert payload["data"]["recent_event"]["status"] == "cancelled"
  86. @pytest.mark.asyncio
  87. async def test_cancel_active_job_marks_pending_and_sets_cancel_flag():
  88. """Cancelling active job marks it as pending cancellation."""
  89. service = BackgroundDispatchService()
  90. job = PrintDispatchJob(
  91. id=42,
  92. kind="reprint_archive",
  93. source_id=100,
  94. source_name="gearbox.gcode.3mf",
  95. printer_id=3,
  96. printer_name="Printer C",
  97. )
  98. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Uploading...")
  99. with patch(
  100. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  101. ) as mock_broadcast:
  102. result = await service.cancel_job(job.id)
  103. assert result["cancelled"] is True
  104. assert result["pending"] is True
  105. assert job.id in service._cancel_requested_job_ids
  106. mock_broadcast.assert_awaited_once()
  107. payload = mock_broadcast.await_args.args[0]
  108. assert payload["data"]["recent_event"]["status"] == "cancelling"
  109. def test_resolve_plate_id_uses_request_value_when_provided(tmp_path):
  110. """Explicit plate_id wins over auto-detection."""
  111. file_path = tmp_path / "dummy.3mf"
  112. file_path.write_text("not-a-zip")
  113. plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=9)
  114. assert plate_id == 9
  115. def test_resolve_plate_id_auto_detects_from_3mf(tmp_path):
  116. """Auto-detect plate from Metadata/plate_X.gcode entry."""
  117. import zipfile
  118. file_path = tmp_path / "multi.3mf"
  119. with zipfile.ZipFile(file_path, "w") as zf:
  120. zf.writestr("Metadata/plate_7.gcode", b"G1 X0 Y0")
  121. plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=None)
  122. assert plate_id == 7
  123. def test_is_sliced_file_recognizes_supported_extensions():
  124. """Only .gcode and .gcode.3mf should be accepted."""
  125. assert BackgroundDispatchService._is_sliced_file("part.gcode") is True
  126. assert BackgroundDispatchService._is_sliced_file("part.gcode.3mf") is True
  127. assert BackgroundDispatchService._is_sliced_file("part.3mf") is False