test_background_dispatch.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. """Unit tests for background dispatch service."""
  2. from types import SimpleNamespace
  3. from unittest.mock import AsyncMock, patch
  4. import pytest
  5. from backend.app.services.background_dispatch import (
  6. ActiveDispatchState,
  7. BackgroundDispatchService,
  8. DispatchEnqueueRejected,
  9. PrintDispatchJob,
  10. )
  11. @pytest.mark.asyncio
  12. async def test_dispatch_rejects_when_printer_busy_printing():
  13. """Reject enqueue when target printer is already printing."""
  14. service = BackgroundDispatchService()
  15. with (
  16. patch(
  17. "backend.app.services.background_dispatch.printer_manager.get_status",
  18. return_value=SimpleNamespace(state="RUNNING", gcode_file="active.gcode.3mf"),
  19. ),
  20. pytest.raises(DispatchEnqueueRejected, match="currently busy printing"),
  21. ):
  22. await service.dispatch_reprint_archive(
  23. archive_id=1,
  24. archive_name="Test Archive",
  25. printer_id=10,
  26. printer_name="Printer A",
  27. options={},
  28. requested_by_user_id=None,
  29. requested_by_username=None,
  30. )
  31. @pytest.mark.asyncio
  32. async def test_dispatch_enqueues_job_and_broadcasts_state():
  33. """Enqueue succeeds and emits websocket queue update."""
  34. service = BackgroundDispatchService()
  35. with (
  36. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  37. patch(
  38. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  39. ) as mock_broadcast,
  40. ):
  41. result = await service.dispatch_print_library_file(
  42. file_id=22,
  43. filename="cube.gcode.3mf",
  44. printer_id=7,
  45. printer_name="Printer B",
  46. options={"plate_id": 2},
  47. requested_by_user_id=5,
  48. requested_by_username="tester",
  49. )
  50. assert result["status"] == "dispatched"
  51. assert result["dispatch_job_id"] == 1
  52. assert result["dispatch_position"] == 1
  53. assert len(service._queued_jobs) == 1
  54. mock_broadcast.assert_awaited_once()
  55. payload = mock_broadcast.await_args.args[0]
  56. assert payload["type"] == "background_dispatch"
  57. assert payload["data"]["recent_event"]["status"] == "dispatched"
  58. @pytest.mark.asyncio
  59. async def test_dispatch_library_file_defaults_cleanup_flag_false():
  60. """cleanup_library_after_dispatch defaults to False when not passed — protects
  61. File Manager / Project Detail / queued-library-file paths from surprise deletion."""
  62. service = BackgroundDispatchService()
  63. with (
  64. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  65. patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock),
  66. ):
  67. await service.dispatch_print_library_file(
  68. file_id=1,
  69. filename="cube.gcode.3mf",
  70. printer_id=1,
  71. printer_name="Printer A",
  72. options={},
  73. requested_by_user_id=None,
  74. requested_by_username=None,
  75. )
  76. assert len(service._queued_jobs) == 1
  77. assert service._queued_jobs[0].cleanup_library_after_dispatch is False
  78. @pytest.mark.asyncio
  79. async def test_dispatch_library_file_propagates_cleanup_flag_true():
  80. """cleanup_library_after_dispatch=True arrives on the queued job so the runner
  81. can delete the transient LibraryFile after the print is accepted by the printer."""
  82. service = BackgroundDispatchService()
  83. with (
  84. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  85. patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock),
  86. ):
  87. await service.dispatch_print_library_file(
  88. file_id=1,
  89. filename="cube.gcode.3mf",
  90. printer_id=1,
  91. printer_name="Printer A",
  92. options={},
  93. requested_by_user_id=42,
  94. requested_by_username="alice",
  95. cleanup_library_after_dispatch=True,
  96. )
  97. assert len(service._queued_jobs) == 1
  98. job = service._queued_jobs[0]
  99. assert job.cleanup_library_after_dispatch is True
  100. # Sanity: other fields still wired correctly
  101. assert job.requested_by_user_id == 42
  102. assert job.requested_by_username == "alice"
  103. assert job.kind == "print_library_file"
  104. @pytest.mark.asyncio
  105. async def test_cancel_queued_job_removes_it_and_broadcasts():
  106. """Cancelling queued job removes it immediately."""
  107. service = BackgroundDispatchService()
  108. with (
  109. patch("backend.app.services.background_dispatch.printer_manager.get_status", return_value=None),
  110. patch(
  111. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  112. ) as mock_broadcast,
  113. ):
  114. result = await service.dispatch_reprint_archive(
  115. archive_id=1,
  116. archive_name="benchy.gcode.3mf",
  117. printer_id=1,
  118. printer_name="Printer 1",
  119. options={},
  120. requested_by_user_id=None,
  121. requested_by_username=None,
  122. )
  123. mock_broadcast.reset_mock()
  124. cancel_result = await service.cancel_job(result["dispatch_job_id"])
  125. assert cancel_result["cancelled"] is True
  126. assert cancel_result["pending"] is False
  127. assert len(service._queued_jobs) == 0
  128. assert service._batch_total == 0
  129. mock_broadcast.assert_awaited_once()
  130. payload = mock_broadcast.await_args.args[0]
  131. assert payload["data"]["recent_event"]["status"] == "cancelled"
  132. @pytest.mark.asyncio
  133. async def test_cancel_active_job_marks_pending_and_sets_cancel_flag():
  134. """Cancelling active job marks it as pending cancellation."""
  135. service = BackgroundDispatchService()
  136. job = PrintDispatchJob(
  137. id=42,
  138. kind="reprint_archive",
  139. source_id=100,
  140. source_name="gearbox.gcode.3mf",
  141. printer_id=3,
  142. printer_name="Printer C",
  143. )
  144. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Uploading...")
  145. with patch(
  146. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  147. ) as mock_broadcast:
  148. result = await service.cancel_job(job.id)
  149. assert result["cancelled"] is True
  150. assert result["pending"] is True
  151. assert job.id in service._cancel_requested_job_ids
  152. mock_broadcast.assert_awaited_once()
  153. payload = mock_broadcast.await_args.args[0]
  154. assert payload["data"]["recent_event"]["status"] == "cancelling"
  155. def test_resolve_plate_id_uses_request_value_when_provided(tmp_path):
  156. """Explicit plate_id wins over auto-detection."""
  157. file_path = tmp_path / "dummy.3mf"
  158. file_path.write_text("not-a-zip")
  159. plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=9)
  160. assert plate_id == 9
  161. def test_resolve_plate_id_auto_detects_from_3mf(tmp_path):
  162. """Auto-detect plate from Metadata/plate_X.gcode entry."""
  163. import zipfile
  164. file_path = tmp_path / "multi.3mf"
  165. with zipfile.ZipFile(file_path, "w") as zf:
  166. zf.writestr("Metadata/plate_7.gcode", b"G1 X0 Y0")
  167. plate_id = BackgroundDispatchService._resolve_plate_id(file_path, requested_plate_id=None)
  168. assert plate_id == 7
  169. def test_is_sliced_file_recognizes_supported_extensions():
  170. """Only .gcode and .gcode.3mf should be accepted."""
  171. assert BackgroundDispatchService._is_sliced_file("part.gcode") is True
  172. assert BackgroundDispatchService._is_sliced_file("part.gcode.3mf") is True
  173. assert BackgroundDispatchService._is_sliced_file("part.3mf") is False
  174. def test_dispatch_option_defaults_align_with_request_schema_defaults():
  175. """The `job.options.get("<field>", <default>)` calls in the dispatch
  176. loop must use the same default as the Pydantic request schema. If a
  177. field is missing from options (e.g. an internal caller bypassing the
  178. schema), the resulting print command must match what a fresh
  179. `ReprintRequest()` / `FilePrintRequest()` would produce — anything
  180. else means certain fields silently flip depending on which entry
  181. point queued the job.
  182. Earlier `vibration_cali` had a False default in the dispatch loop
  183. against a True schema default, latent only because every existing
  184. caller always sent the field.
  185. """
  186. import inspect
  187. from backend.app.schemas.archive import ReprintRequest
  188. from backend.app.schemas.library import FilePrintRequest
  189. from backend.app.services import background_dispatch as bd
  190. fields = ("bed_levelling", "flow_cali", "vibration_cali", "layer_inspect", "timelapse", "use_ams")
  191. reprint_defaults = {f: getattr(ReprintRequest(), f) for f in fields}
  192. libprint_defaults = {f: getattr(FilePrintRequest(), f) for f in fields}
  193. assert reprint_defaults == libprint_defaults, (
  194. "ReprintRequest and FilePrintRequest must share the same defaults for these fields"
  195. )
  196. src = inspect.getsource(bd)
  197. for field, expected_default in reprint_defaults.items():
  198. literal = "True" if expected_default else "False"
  199. needle = f'{field}=job.options.get("{field}", {literal})'
  200. count = src.count(needle)
  201. assert count == 2, (
  202. f"Expected exactly 2 occurrences of `{needle}` in background_dispatch (one per "
  203. f"`_process_job` branch). Found {count}. A drift between the request schema's "
  204. f"default for `{field}` and the dispatch loop's `.get()` default means callers "
  205. f"that bypass the schema will get inconsistent behaviour."
  206. )
  207. @pytest.mark.asyncio
  208. async def test_cancel_job_not_found_returns_false():
  209. """Cancelling a nonexistent job returns not_found."""
  210. service = BackgroundDispatchService()
  211. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  212. result = await service.cancel_job(999)
  213. assert result["cancelled"] is False
  214. assert result["reason"] == "not_found"
  215. @pytest.mark.asyncio
  216. async def test_cancel_job_single_lock_covers_both_active_and_queued():
  217. """cancel_job checks both active and queued jobs under a single lock acquisition.
  218. Regression test for TOCTOU race: previously two separate lock acquisitions allowed
  219. the dispatcher loop to move a job from queue to active between them, causing cancel
  220. to find it in neither place.
  221. """
  222. service = BackgroundDispatchService()
  223. # Set up a job in the queue AND an active job for a different printer
  224. active_job = PrintDispatchJob(
  225. id=1,
  226. kind="reprint_archive",
  227. source_id=10,
  228. source_name="active.3mf",
  229. printer_id=1,
  230. printer_name="Printer 1",
  231. )
  232. service._active_jobs[active_job.id] = ActiveDispatchState(job=active_job, message="Uploading...")
  233. queued_job = PrintDispatchJob(
  234. id=2,
  235. kind="reprint_archive",
  236. source_id=20,
  237. source_name="queued.3mf",
  238. printer_id=2,
  239. printer_name="Printer 2",
  240. )
  241. service._queued_jobs.append(queued_job)
  242. service._batch_total = 2
  243. with patch(
  244. "backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock
  245. ) as mock_broadcast:
  246. # Cancel the queued job — should find it in single lock acquisition
  247. result = await service.cancel_job(2)
  248. assert result["cancelled"] is True
  249. assert result["pending"] is False
  250. assert len(service._queued_jobs) == 0
  251. # Active job should be untouched
  252. assert 1 in service._active_jobs
  253. mock_broadcast.assert_awaited_once()
  254. payload = mock_broadcast.await_args.args[0]
  255. assert payload["data"]["recent_event"]["status"] == "cancelled"
  256. @pytest.mark.asyncio
  257. async def test_mark_job_finished_resets_batch_when_all_done():
  258. """Batch counters reset after last job completes."""
  259. service = BackgroundDispatchService()
  260. job = PrintDispatchJob(
  261. id=1,
  262. kind="reprint_archive",
  263. source_id=10,
  264. source_name="test.3mf",
  265. printer_id=1,
  266. printer_name="Printer 1",
  267. )
  268. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  269. service._batch_total = 1
  270. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  271. await service._mark_job_finished(job, failed=False, message="Complete")
  272. assert service._batch_total == 0
  273. assert service._batch_completed == 0
  274. assert service._batch_failed == 0
  275. @pytest.mark.asyncio
  276. async def test_mark_job_finished_no_reset_when_jobs_remain():
  277. """Batch counters NOT reset when queued jobs remain."""
  278. service = BackgroundDispatchService()
  279. job = PrintDispatchJob(
  280. id=1,
  281. kind="reprint_archive",
  282. source_id=10,
  283. source_name="test.3mf",
  284. printer_id=1,
  285. printer_name="Printer 1",
  286. )
  287. remaining_job = PrintDispatchJob(
  288. id=2,
  289. kind="reprint_archive",
  290. source_id=20,
  291. source_name="next.3mf",
  292. printer_id=2,
  293. printer_name="Printer 2",
  294. )
  295. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  296. service._queued_jobs.append(remaining_job)
  297. service._batch_total = 2
  298. with patch("backend.app.services.background_dispatch.ws_manager.broadcast", new_callable=AsyncMock):
  299. await service._mark_job_finished(job, failed=False, message="Complete")
  300. # Batch counters should NOT be reset — remaining job still queued
  301. assert service._batch_total == 2
  302. assert service._batch_completed == 1
  303. @pytest.mark.asyncio
  304. async def test_mark_job_finished_batch_reset_rechecks_under_lock():
  305. """Batch reset re-checks condition inside second lock acquisition.
  306. Regression test for TOCTOU: a new dispatch between the two lock acquisitions
  307. could get its counters zeroed if the re-check is missing.
  308. """
  309. service = BackgroundDispatchService()
  310. job = PrintDispatchJob(
  311. id=1,
  312. kind="reprint_archive",
  313. source_id=10,
  314. source_name="test.3mf",
  315. printer_id=1,
  316. printer_name="Printer 1",
  317. )
  318. service._active_jobs[job.id] = ActiveDispatchState(job=job, message="Done")
  319. service._batch_total = 1
  320. original_broadcast = AsyncMock()
  321. async def inject_new_job_during_broadcast(msg):
  322. """Simulate a new dispatch arriving between the two lock acquisitions."""
  323. await original_broadcast(msg)
  324. # After broadcast (lock released), inject a new job before reset re-check
  325. if not service._queued_jobs:
  326. new_job = PrintDispatchJob(
  327. id=99,
  328. kind="reprint_archive",
  329. source_id=99,
  330. source_name="injected.3mf",
  331. printer_id=5,
  332. printer_name="Printer 5",
  333. )
  334. service._queued_jobs.append(new_job)
  335. service._batch_total = 1
  336. with patch(
  337. "backend.app.services.background_dispatch.ws_manager.broadcast",
  338. side_effect=inject_new_job_during_broadcast,
  339. ):
  340. await service._mark_job_finished(job, failed=False, message="Complete")
  341. # Re-check should prevent reset since a new job appeared
  342. assert service._batch_total == 1
  343. assert len(service._queued_jobs) == 1