test_background_dispatch.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. """Unit tests for background dispatch service."""
  2. from types import SimpleNamespace
  3. from unittest.mock import AsyncMock, patch
  4. import pytest
  5. from backend.app.services.background_dispatch import (
  6. ActiveDispatchState,
  7. BackgroundDispatchService,
  8. DispatchEnqueueRejected,
  9. PrintDispatchJob,
  10. )
  11. @pytest.mark.asyncio
  12. async def test_dispatch_rejects_when_printer_busy_printing():
  13. """Reject enqueue when target printer is already printing."""
  14. service = BackgroundDispatchService()
  15. with (
  16. patch(
  17. "backend.app.services.background_dispatch.printer_manager.get_status",
  18. return_value=SimpleNamespace(state="RUNNING", gcode_file="active.gcode.3mf"),
  19. ),
  20. pytest.raises(DispatchEnqueueRejected, match="currently busy printing"),
  21. ):
  22. await service.dispatch_reprint_archive(
  23. archive_id=1,
  24. archive_name="Test Archive",
  25. printer_id=10,
  26. printer_name="Printer A",
  27. options={},
  28. requested_by_user_id=None,
  29. requested_by_username=None,
  30. )
  31. @pytest.mark.asyncio
  32. async def test_dispatch_enqueues_job_and_broadcasts_state():
  33. """Enqueue succeeds and emits websocket queue update."""
  34. service = BackgroundDispatchService()
  35. with (
  36. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  37. patch(
  38. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  39. ) as mock_broadcast,
  40. ):
  41. result = await service.dispatch_print_library_file(
  42. file_id=22,
  43. filename="cube.gcode.3mf",
  44. printer_id=7,
  45. printer_name="Printer B",
  46. options={"plate_id": 2},
  47. requested_by_user_id=5,
  48. requested_by_username="tester",
  49. )
  50. assert result["status"] == "dispatched"
  51. assert result["dispatch_job_id"] == 1
  52. assert result["dispatch_position"] == 1
  53. assert len(service._queued_jobs) == 1
  54. mock_broadcast.assert_awaited_once()
  55. payload = mock_broadcast.await_args.args[0]
  56. assert payload["type"] == "background_dispatch"
  57. assert payload["data"]["recent_event"]["status"] == "dispatched"
  58. @pytest.mark.asyncio
  59. async def test_dispatch_library_file_defaults_cleanup_flag_false():
  60. """cleanup_library_after_dispatch defaults to False when not passed — protects
  61. File Manager / Project Detail / queued-library-file paths from surprise deletion."""
  62. service = BackgroundDispatchService()
  63. with (
  64. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  65. patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock),
  66. ):
  67. await service.dispatch_print_library_file(
  68. file_id=1,
  69. filename="cube.gcode.3mf",
  70. printer_id=1,
  71. printer_name="Printer A",
  72. options={},
  73. requested_by_user_id=None,
  74. requested_by_username=None,
  75. )
  76. assert len(service._queued_jobs) == 1
  77. assert service._queued_jobs[0].cleanup_library_after_dispatch is False
  78. @pytest.mark.asyncio
  79. async def test_dispatch_library_file_propagates_cleanup_flag_true():
  80. """cleanup_library_after_dispatch=True arrives on the queued job so the runner
  81. can delete the transient LibraryFile after the print is accepted by the printer."""
  82. service = BackgroundDispatchService()
  83. with (
  84. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  85. patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock),
  86. ):
  87. await service.dispatch_print_library_file(
  88. file_id=1,
  89. filename="cube.gcode.3mf",
  90. printer_id=1,
  91. printer_name="Printer A",
  92. options={},
  93. requested_by_user_id=42,
  94. requested_by_username="alice",
  95. cleanup_library_after_dispatch=True,
  96. )
  97. assert len(service._queued_jobs) == 1
  98. job = service._queued_jobs[0]
  99. assert job.cleanup_library_after_dispatch is True
  100. # Sanity: other fields still wired correctly
  101. assert job.requested_by_user_id == 42
  102. assert job.requested_by_username == "alice"
  103. assert job.kind == "print_library_file"
  104. @pytest.mark.asyncio
  105. async def test_cancel_queued_job_removes_it_and_broadcasts():
  106. """Cancelling queued job removes it immediately."""
  107. service = BackgroundDispatchService()
  108. with (
  109. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  110. patch(
  111. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  112. ) as mock_broadcast,
  113. ):
  114. result = await service.dispatch_reprint_archive(
  115. archive_id=1,
  116. archive_name="benchy.gcode.3mf",
  117. printer_id=1,
  118. printer_name="Printer 1",
  119. options={},
  120. requested_by_user_id=None,
  121. requested_by_username=None,
  122. )
  123. mock_broadcast.reset_mock()
  124. cancel_result = await service.cancel_job(result["dispatch_job_id"])
  125. assert cancel_result["cancelled"] is True
  126. assert cancel_result["pending"] is False
  127. assert len(service._queued_jobs) == 0
  128. assert service._batch_total == 0
  129. mock_broadcast.assert_awaited_once()
  130. payload = mock_broadcast.await_args.args[0]
  131. assert payload["data"]["recent_event"]["status"] == "cancelled"
  132. @pytest.mark.asyncio
  133. async def test_cancel_active_job_marks_pending_and_sets_cancel_flag():
  134. """Cancelling active job marks it as pending cancellation."""
  135. service = BackgroundDispatchService()
  136. job = PrintDispatchJob(
  137. id=42,
  138. kind="reprint_archive",
  139. source_id=100,
  140. source_name="gearbox.gcode.3mf",
  141. printer_id=3,
  142. printer_name="Printer C",
  143. )
  144. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Uploading...")
  145. with patch(
  146. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  147. ) as mock_broadcast:
  148. result = await service.cancel_job(job.id)
  149. assert result["cancelled"] is True
  150. assert result["pending"] is True
  151. assert job.id in service._cancel_requested_job_ids
  152. mock_broadcast.assert_awaited_once()
  153. payload = mock_broadcast.await_args.args[0]
  154. assert payload["data"]["recent_event"]["status"] == "cancelling"
  155. def test_resolve_plate_id_uses_request_value_when_provided(tmp_path):
  156. """Explicit plate_id wins over auto-detection."""
  157. file_path = tmp_path / "dummy.3mf"
  158. file_path.write_text("not-a-zip")
  159. plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=9)
  160. assert plate_id == 9
  161. def test_resolve_plate_id_auto_detects_from_3mf(tmp_path):
  162. """Auto-detect plate from Metadata/plate_X.gcode entry."""
  163. import zipfile
  164. file_path = tmp_path / "multi.3mf"
  165. with zipfile.ZipFile(file_path, "w") as zf:
  166. zf.writestr("Metadata/plate_7.gcode", b"G1 X0 Y0")
  167. plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=None)
  168. assert plate_id == 7
  169. def test_is_sliced_file_recognizes_supported_extensions():
  170. """Only .gcode and .gcode.3mf should be accepted."""
  171. assert BackgroundDispatchService._is_sliced_file("part.gcode") is True
  172. assert BackgroundDispatchService._is_sliced_file("part.gcode.3mf") is True
  173. assert BackgroundDispatchService._is_sliced_file("part.3mf") is False
  174. @pytest.mark.asyncio
  175. async def test_cancel_job_not_found_returns_false():
  176. """Cancelling a nonexistent job returns not_found."""
  177. service = BackgroundDispatchService()
  178. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  179. result = await service.cancel_job(999)
  180. assert result["cancelled"] is False
  181. assert result["reason"] == "not_found"
  182. @pytest.mark.asyncio
  183. async def test_cancel_job_single_lock_covers_both_active_and_queued():
  184. """cancel_job checks both active and queued jobs under a single lock acquisition.
  185. Regression test for TOCTOU race: previously two separate lock acquisitions allowed
  186. the dispatcher loop to move a job from queue to active between them, causing cancel
  187. to find it in neither place.
  188. """
  189. service = BackgroundDispatchService()
  190. # Set up a job in the queue AND an active job for a different printer
  191. active_job = PrintDispatchJob(
  192. id=1,
  193. kind="reprint_archive",
  194. source_id=10,
  195. source_name="active.3mf",
  196. printer_id=1,
  197. printer_name="Printer 1",
  198. )
  199. service._active_jobs[active_job.id] = ActiveDispatchState(job=active_job, message="Uploading...")
  200. queued_job = PrintDispatchJob(
  201. id=2,
  202. kind="reprint_archive",
  203. source_id=20,
  204. source_name="queued.3mf",
  205. printer_id=2,
  206. printer_name="Printer 2",
  207. )
  208. service._queued_jobs.append(queued_job)
  209. service._batch_total = 2
  210. with patch(
  211. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  212. ) as mock_broadcast:
  213. # Cancel the queued job — should find it in single lock acquisition
  214. result = await service.cancel_job(2)
  215. assert result["cancelled"] is True
  216. assert result["pending"] is False
  217. assert len(service._queued_jobs) == 0
  218. # Active job should be untouched
  219. assert 1 in service._active_jobs
  220. mock_broadcast.assert_awaited_once()
  221. payload = mock_broadcast.await_args.args[0]
  222. assert payload["data"]["recent_event"]["status"] == "cancelled"
  223. @pytest.mark.asyncio
  224. async def test_mark_job_finished_resets_batch_when_all_done():
  225. """Batch counters reset after last job completes."""
  226. service = BackgroundDispatchService()
  227. job = PrintDispatchJob(
  228. id=1,
  229. kind="reprint_archive",
  230. source_id=10,
  231. source_name="test.3mf",
  232. printer_id=1,
  233. printer_name="Printer 1",
  234. )
  235. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  236. service._batch_total = 1
  237. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  238. await service._mark_job_finished(job, failed=False, message="Complete")
  239. assert service._batch_total == 0
  240. assert service._batch_completed == 0
  241. assert service._batch_failed == 0
  242. @pytest.mark.asyncio
  243. async def test_mark_job_finished_no_reset_when_jobs_remain():
  244. """Batch counters NOT reset when queued jobs remain."""
  245. service = BackgroundDispatchService()
  246. job = PrintDispatchJob(
  247. id=1,
  248. kind="reprint_archive",
  249. source_id=10,
  250. source_name="test.3mf",
  251. printer_id=1,
  252. printer_name="Printer 1",
  253. )
  254. remaining_job = PrintDispatchJob(
  255. id=2,
  256. kind="reprint_archive",
  257. source_id=20,
  258. source_name="next.3mf",
  259. printer_id=2,
  260. printer_name="Printer 2",
  261. )
  262. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  263. service._queued_jobs.append(remaining_job)
  264. service._batch_total = 2
  265. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  266. await service._mark_job_finished(job, failed=False, message="Complete")
  267. # Batch counters should NOT be reset — remaining job still queued
  268. assert service._batch_total == 2
  269. assert service._batch_completed == 1
  270. @pytest.mark.asyncio
  271. async def test_mark_job_finished_batch_reset_rechecks_under_lock():
  272. """Batch reset re-checks condition inside second lock acquisition.
  273. Regression test for TOCTOU: a new dispatch between the two lock acquisitions
  274. could get its counters zeroed if the re-check is missing.
  275. """
  276. service = BackgroundDispatchService()
  277. job = PrintDispatchJob(
  278. id=1,
  279. kind="reprint_archive",
  280. source_id=10,
  281. source_name="test.3mf",
  282. printer_id=1,
  283. printer_name="Printer 1",
  284. )
  285. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  286. service._batch_total = 1
  287. original_broadcast = AsyncMock()
  288. async def inject_new_job_during_broadcast(msg):
  289. """Simulate a new dispatch arriving between the two lock acquisitions."""
  290. await original_broadcast(msg)
  291. # After broadcast (lock released), inject a new job before reset re-check
  292. if not service._queued_jobs:
  293. new_job = PrintDispatchJob(
  294. id=99,
  295. kind="reprint_archive",
  296. source_id=99,
  297. source_name="injected.3mf",
  298. printer_id=5,
  299. printer_name="Printer 5",
  300. )
  301. service._queued_jobs.append(new_job)
  302. service._batch_total = 1
  303. with patch(
  304. "backend.app.services.background_dispatch.ws_manager.broadcast",
  305. side_effect=inject_new_job_during_broadcast,
  306. ):
  307. await service._mark_job_finished(job, failed=False, message="Complete")
  308. # Re-check should prevent reset since a new job appeared
  309. assert service._batch_total == 1
  310. assert len(service._queued_jobs) == 1