test_background_dispatch.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. """Unit tests for background dispatch service."""
  2. from types import SimpleNamespace
  3. from unittest.mock import AsyncMock, patch
  4. import pytest
  5. from backend.app.services.background_dispatch import (
  6. ActiveDispatchState,
  7. BackgroundDispatchService,
  8. DispatchEnqueueRejected,
  9. PrintDispatchJob,
  10. )
  11. @pytest.mark.asyncio
  12. async def test_dispatch_rejects_when_printer_busy_printing():
  13. """Reject enqueue when target printer is already printing."""
  14. service = BackgroundDispatchService()
  15. with (
  16. patch(
  17. "backend.app.services.background_dispatch.printer_manager.get_status",
  18. return_value=SimpleNamespace(state="RUNNING", gcode_file="active.gcode.3mf"),
  19. ),
  20. pytest.raises(DispatchEnqueueRejected, match="currently busy printing"),
  21. ):
  22. await service.dispatch_reprint_archive(
  23. archive_id=1,
  24. archive_name="Test Archive",
  25. printer_id=10,
  26. printer_name="Printer A",
  27. options={},
  28. requested_by_user_id=None,
  29. requested_by_username=None,
  30. )
  31. @pytest.mark.asyncio
  32. async def test_dispatch_enqueues_job_and_broadcasts_state():
  33. """Enqueue succeeds and emits websocket queue update."""
  34. service = BackgroundDispatchService()
  35. with (
  36. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  37. patch(
  38. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  39. ) as mock_broadcast,
  40. ):
  41. result = await service.dispatch_print_library_file(
  42. file_id=22,
  43. filename="cube.gcode.3mf",
  44. printer_id=7,
  45. printer_name="Printer B",
  46. options={"plate_id": 2},
  47. requested_by_user_id=5,
  48. requested_by_username="tester",
  49. )
  50. assert result["status"] == "dispatched"
  51. assert result["dispatch_job_id"] == 1
  52. assert result["dispatch_position"] == 1
  53. assert len(service._queued_jobs) == 1
  54. mock_broadcast.assert_awaited_once()
  55. payload = mock_broadcast.await_args.args[0]
  56. assert payload["type"] == "background_dispatch"
  57. assert payload["data"]["recent_event"]["status"] == "dispatched"
  58. @pytest.mark.asyncio
  59. async def test_cancel_queued_job_removes_it_and_broadcasts():
  60. """Cancelling queued job removes it immediately."""
  61. service = BackgroundDispatchService()
  62. with (
  63. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  64. patch(
  65. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  66. ) as mock_broadcast,
  67. ):
  68. result = await service.dispatch_reprint_archive(
  69. archive_id=1,
  70. archive_name="benchy.gcode.3mf",
  71. printer_id=1,
  72. printer_name="Printer 1",
  73. options={},
  74. requested_by_user_id=None,
  75. requested_by_username=None,
  76. )
  77. mock_broadcast.reset_mock()
  78. cancel_result = await service.cancel_job(result["dispatch_job_id"])
  79. assert cancel_result["cancelled"] is True
  80. assert cancel_result["pending"] is False
  81. assert len(service._queued_jobs) == 0
  82. assert service._batch_total == 0
  83. mock_broadcast.assert_awaited_once()
  84. payload = mock_broadcast.await_args.args[0]
  85. assert payload["data"]["recent_event"]["status"] == "cancelled"
  86. @pytest.mark.asyncio
  87. async def test_cancel_active_job_marks_pending_and_sets_cancel_flag():
  88. """Cancelling active job marks it as pending cancellation."""
  89. service = BackgroundDispatchService()
  90. job = PrintDispatchJob(
  91. id=42,
  92. kind="reprint_archive",
  93. source_id=100,
  94. source_name="gearbox.gcode.3mf",
  95. printer_id=3,
  96. printer_name="Printer C",
  97. )
  98. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Uploading...")
  99. with patch(
  100. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  101. ) as mock_broadcast:
  102. result = await service.cancel_job(job.id)
  103. assert result["cancelled"] is True
  104. assert result["pending"] is True
  105. assert job.id in service._cancel_requested_job_ids
  106. mock_broadcast.assert_awaited_once()
  107. payload = mock_broadcast.await_args.args[0]
  108. assert payload["data"]["recent_event"]["status"] == "cancelling"
  109. def test_resolve_plate_id_uses_request_value_when_provided(tmp_path):
  110. """Explicit plate_id wins over auto-detection."""
  111. file_path = tmp_path / "dummy.3mf"
  112. file_path.write_text("not-a-zip")
  113. plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=9)
  114. assert plate_id == 9
  115. def test_resolve_plate_id_auto_detects_from_3mf(tmp_path):
  116. """Auto-detect plate from Metadata/plate_X.gcode entry."""
  117. import zipfile
  118. file_path = tmp_path / "multi.3mf"
  119. with zipfile.ZipFile(file_path, "w") as zf:
  120. zf.writestr("Metadata/plate_7.gcode", b"G1 X0 Y0")
  121. plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=None)
  122. assert plate_id == 7
  123. def test_is_sliced_file_recognizes_supported_extensions():
  124. """Only .gcode and .gcode.3mf should be accepted."""
  125. assert BackgroundDispatchService._is_sliced_file("part.gcode") is True
  126. assert BackgroundDispatchService._is_sliced_file("part.gcode.3mf") is True
  127. assert BackgroundDispatchService._is_sliced_file("part.3mf") is False
  128. @pytest.mark.asyncio
  129. async def test_cancel_job_not_found_returns_false():
  130. """Cancelling a nonexistent job returns not_found."""
  131. service = BackgroundDispatchService()
  132. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  133. result = await service.cancel_job(999)
  134. assert result["cancelled"] is False
  135. assert result["reason"] == "not_found"
  136. @pytest.mark.asyncio
  137. async def test_cancel_job_single_lock_covers_both_active_and_queued():
  138. """cancel_job checks both active and queued jobs under a single lock acquisition.
  139. Regression test for TOCTOU race: previously two separate lock acquisitions allowed
  140. the dispatcher loop to move a job from queue to active between them, causing cancel
  141. to find it in neither place.
  142. """
  143. service = BackgroundDispatchService()
  144. # Set up a job in the queue AND an active job for a different printer
  145. active_job = PrintDispatchJob(
  146. id=1,
  147. kind="reprint_archive",
  148. source_id=10,
  149. source_name="active.3mf",
  150. printer_id=1,
  151. printer_name="Printer 1",
  152. )
  153. service._active_jobs[active_job.id] = ActiveDispatchState(job=active_job, message="Uploading...")
  154. queued_job = PrintDispatchJob(
  155. id=2,
  156. kind="reprint_archive",
  157. source_id=20,
  158. source_name="queued.3mf",
  159. printer_id=2,
  160. printer_name="Printer 2",
  161. )
  162. service._queued_jobs.append(queued_job)
  163. service._batch_total = 2
  164. with patch(
  165. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  166. ) as mock_broadcast:
  167. # Cancel the queued job — should find it in single lock acquisition
  168. result = await service.cancel_job(2)
  169. assert result["cancelled"] is True
  170. assert result["pending"] is False
  171. assert len(service._queued_jobs) == 0
  172. # Active job should be untouched
  173. assert 1 in service._active_jobs
  174. mock_broadcast.assert_awaited_once()
  175. payload = mock_broadcast.await_args.args[0]
  176. assert payload["data"]["recent_event"]["status"] == "cancelled"
  177. @pytest.mark.asyncio
  178. async def test_mark_job_finished_resets_batch_when_all_done():
  179. """Batch counters reset after last job completes."""
  180. service = BackgroundDispatchService()
  181. job = PrintDispatchJob(
  182. id=1,
  183. kind="reprint_archive",
  184. source_id=10,
  185. source_name="test.3mf",
  186. printer_id=1,
  187. printer_name="Printer 1",
  188. )
  189. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  190. service._batch_total = 1
  191. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  192. await service._mark_job_finished(job, failed=False, message="Complete")
  193. assert service._batch_total == 0
  194. assert service._batch_completed == 0
  195. assert service._batch_failed == 0
  196. @pytest.mark.asyncio
  197. async def test_mark_job_finished_no_reset_when_jobs_remain():
  198. """Batch counters NOT reset when queued jobs remain."""
  199. service = BackgroundDispatchService()
  200. job = PrintDispatchJob(
  201. id=1,
  202. kind="reprint_archive",
  203. source_id=10,
  204. source_name="test.3mf",
  205. printer_id=1,
  206. printer_name="Printer 1",
  207. )
  208. remaining_job = PrintDispatchJob(
  209. id=2,
  210. kind="reprint_archive",
  211. source_id=20,
  212. source_name="next.3mf",
  213. printer_id=2,
  214. printer_name="Printer 2",
  215. )
  216. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  217. service._queued_jobs.append(remaining_job)
  218. service._batch_total = 2
  219. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  220. await service._mark_job_finished(job, failed=False, message="Complete")
  221. # Batch counters should NOT be reset — remaining job still queued
  222. assert service._batch_total == 2
  223. assert service._batch_completed == 1
  224. @pytest.mark.asyncio
  225. async def test_mark_job_finished_batch_reset_rechecks_under_lock():
  226. """Batch reset re-checks condition inside second lock acquisition.
  227. Regression test for TOCTOU: a new dispatch between the two lock acquisitions
  228. could get its counters zeroed if the re-check is missing.
  229. """
  230. service = BackgroundDispatchService()
  231. job = PrintDispatchJob(
  232. id=1,
  233. kind="reprint_archive",
  234. source_id=10,
  235. source_name="test.3mf",
  236. printer_id=1,
  237. printer_name="Printer 1",
  238. )
  239. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  240. service._batch_total = 1
  241. original_broadcast = AsyncMock()
  242. async def inject_new_job_during_broadcast(msg):
  243. """Simulate a new dispatch arriving between the two lock acquisitions."""
  244. await original_broadcast(msg)
  245. # After broadcast (lock released), inject a new job before reset re-check
  246. if not service._queued_jobs:
  247. new_job = PrintDispatchJob(
  248. id=99,
  249. kind="reprint_archive",
  250. source_id=99,
  251. source_name="injected.3mf",
  252. printer_id=5,
  253. printer_name="Printer 5",
  254. )
  255. service._queued_jobs.append(new_job)
  256. service._batch_total = 1
  257. with patch(
  258. "backend.app.services.background_dispatch.ws_manager.broadcast",
  259. side_effect=inject_new_job_during_broadcast,
  260. ):
  261. await service._mark_job_finished(job, failed=False, message="Complete")
  262. # Re-check should prevent reset since a new job appeared
  263. assert service._batch_total == 1
  264. assert len(service._queued_jobs) == 1