test_print_lifecycle.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. """
  2. Integration tests for the full print lifecycle.
  3. These tests verify that:
  4. 1. Print start creates a new archive
  5. 2. Print complete updates archive status
  6. 3. Callbacks are properly executed
  7. 4. Energy tracking works
  8. 5. Notifications are sent
  9. Note: These tests use mocking to avoid database conflicts.
  10. Full end-to-end tests require the actual database setup.
  11. """
  12. import asyncio
  13. from unittest.mock import AsyncMock, MagicMock, patch
  14. import pytest
  15. class TestPrintStartLogic:
  16. """Test print start callback logic without database integration."""
  17. @pytest.mark.asyncio
  18. async def test_print_start_calls_notification_service(self, capture_logs):
  19. """Verify on_print_start triggers notification service."""
  20. with (
  21. patch("backend.app.main.async_session") as mock_session_maker,
  22. patch("backend.app.main.notification_service") as mock_notif,
  23. patch("backend.app.main.smart_plug_manager") as mock_plug,
  24. patch("backend.app.main.ws_manager") as mock_ws,
  25. ):
  26. mock_notif.on_print_start = AsyncMock()
  27. mock_plug.on_print_start = AsyncMock()
  28. mock_ws.send_print_start = AsyncMock()
  29. # Mock the database session
  30. mock_session = AsyncMock()
  31. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  32. mock_session.__aexit__ = AsyncMock()
  33. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  34. mock_session_maker.return_value = mock_session
  35. from backend.app.main import on_print_start
  36. await on_print_start(
  37. 1,
  38. {
  39. "filename": "/data/Metadata/test.gcode",
  40. "subtask_name": "Test",
  41. },
  42. )
  43. # Verify WebSocket notification was sent
  44. mock_ws.send_print_start.assert_called_once()
  45. # Verify no import shadowing errors
  46. errors = [r for r in capture_logs.get_errors() if "cannot access local variable" in str(r.message)]
  47. assert not errors, f"Import shadowing error: {capture_logs.format_errors()}"
  48. class TestPlateClearGate:
  49. """The plate-clear gate (#961) blocks the queue from auto-dispatching the
  50. next print until the user acknowledges the bed was cleared. The gate must
  51. be raised on every terminal status that could have left material on the
  52. bed — including aborted (printer self-abort or touchscreen stop) and
  53. cancelled (user stopped via Bambuddy queue UI). #1171: prior code only
  54. raised the flag for completed/failed, so an aborted print auto-dispatched
  55. the next queue item onto a fouled bed two seconds later."""
  56. @staticmethod
  57. def _setup_mocks(stack):
  58. mock_session_maker = stack.enter_context(patch("backend.app.main.async_session"))
  59. stack.enter_context(patch("backend.app.main.notification_service")).on_print_complete = AsyncMock()
  60. stack.enter_context(patch("backend.app.main.smart_plug_manager")).on_print_complete = AsyncMock()
  61. mock_ws = stack.enter_context(patch("backend.app.main.ws_manager"))
  62. mock_ws.send_print_complete = AsyncMock()
  63. mock_ws.broadcast = AsyncMock()
  64. stack.enter_context(patch("backend.app.main.mqtt_relay")).on_print_complete = AsyncMock()
  65. mock_pm = stack.enter_context(patch("backend.app.main.printer_manager"))
  66. mock_pm.get_printer.return_value = None
  67. # Real method under test — track each call so the test can assert on it.
  68. mock_pm.set_awaiting_plate_clear = MagicMock()
  69. mock_session = AsyncMock()
  70. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  71. mock_session.__aexit__ = AsyncMock()
  72. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  73. mock_session_maker.return_value = mock_session
  74. return mock_pm
  75. @pytest.mark.asyncio
  76. @pytest.mark.parametrize(
  77. "status",
  78. ["completed", "failed", "aborted", "cancelled"],
  79. ids=["completed", "failed", "aborted-1171", "cancelled-1171"],
  80. )
  81. async def test_plate_clear_gate_raised_for_every_terminal_status(self, status):
  82. """Regression for #1171. Every terminal status that can leave material
  83. on the bed must raise the gate. Pre-fix the gate was raised only for
  84. completed/failed, so aborted (printer touchscreen stop, self-abort) and
  85. cancelled (Bambuddy queue stop) auto-dispatched the next queue item
  86. onto a fouled bed."""
  87. from contextlib import ExitStack
  88. tasks_before = set(asyncio.all_tasks())
  89. with ExitStack() as stack:
  90. mock_pm = self._setup_mocks(stack)
  91. from backend.app.main import on_print_complete
  92. await on_print_complete(
  93. 1,
  94. {
  95. "status": status,
  96. "filename": "/data/Metadata/test.gcode",
  97. "subtask_name": "Test",
  98. "timelapse_was_active": False,
  99. },
  100. )
  101. for task in asyncio.all_tasks() - tasks_before:
  102. task.cancel()
  103. try:
  104. await task
  105. except (asyncio.CancelledError, Exception):
  106. pass
  107. mock_pm.set_awaiting_plate_clear.assert_any_call(1, True)
  108. @pytest.mark.asyncio
  109. async def test_plate_clear_gate_not_raised_for_unknown_status(self):
  110. """Defence in depth: an unknown / not-terminal status string from a
  111. future firmware revision must not silently raise the gate. The flag is
  112. only meaningful when the print actually ended."""
  113. from contextlib import ExitStack
  114. tasks_before = set(asyncio.all_tasks())
  115. with ExitStack() as stack:
  116. mock_pm = self._setup_mocks(stack)
  117. from backend.app.main import on_print_complete
  118. await on_print_complete(
  119. 1,
  120. {
  121. "status": "unknown_future_status",
  122. "filename": "/data/Metadata/test.gcode",
  123. "subtask_name": "Test",
  124. "timelapse_was_active": False,
  125. },
  126. )
  127. for task in asyncio.all_tasks() - tasks_before:
  128. task.cancel()
  129. try:
  130. await task
  131. except (asyncio.CancelledError, Exception):
  132. pass
  133. # The mock records every call; assert no True-call landed.
  134. true_calls = [c for c in mock_pm.set_awaiting_plate_clear.call_args_list if c.args[1] is True]
  135. assert true_calls == [], (
  136. "Gate must not be raised for an unrecognised terminal status; "
  137. f"set_awaiting_plate_clear({1}, True) was called {len(true_calls)} time(s)."
  138. )
  139. class TestPrintCompleteLogic:
  140. """Test print complete callback logic."""
  141. @pytest.mark.asyncio
  142. async def test_print_complete_no_import_errors(self, capture_logs):
  143. """Verify on_print_complete doesn't have import shadowing issues."""
  144. # Snapshot tasks before the call so we can cancel orphans afterwards.
  145. # on_print_complete fires background tasks (maintenance check, notifications,
  146. # smart-plug) via asyncio.create_task. If those tasks outlive the mock
  147. # context they use the *real* async_session and can send real notifications.
  148. tasks_before = set(asyncio.all_tasks())
  149. with (
  150. patch("backend.app.main.async_session") as mock_session_maker,
  151. patch("backend.app.main.notification_service") as mock_notif,
  152. patch("backend.app.main.smart_plug_manager") as mock_plug,
  153. patch("backend.app.main.ws_manager") as mock_ws,
  154. patch("backend.app.main.mqtt_relay") as mock_relay,
  155. patch("backend.app.main.printer_manager") as mock_pm,
  156. ):
  157. mock_notif.on_print_complete = AsyncMock()
  158. mock_plug.on_print_complete = AsyncMock()
  159. mock_ws.send_print_complete = AsyncMock()
  160. mock_ws.broadcast = AsyncMock()
  161. mock_relay.on_print_complete = AsyncMock()
  162. mock_pm.get_printer.return_value = None
  163. # Mock the database session
  164. mock_session = AsyncMock()
  165. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  166. mock_session.__aexit__ = AsyncMock()
  167. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  168. mock_session_maker.return_value = mock_session
  169. from backend.app.main import on_print_complete
  170. await on_print_complete(
  171. 1,
  172. {
  173. "status": "completed",
  174. "filename": "/data/Metadata/test.gcode",
  175. "subtask_name": "Test",
  176. "timelapse_was_active": False,
  177. },
  178. )
  179. # Cancel background tasks spawned by on_print_complete before
  180. # leaving the mock context — prevents them from running with
  181. # the real async_session and sending real notifications.
  182. for task in asyncio.all_tasks() - tasks_before:
  183. task.cancel()
  184. try:
  185. await task
  186. except (asyncio.CancelledError, Exception):
  187. pass
  188. # Verify no import shadowing errors - this would have caught the ArchiveService bug
  189. errors = [r for r in capture_logs.get_errors() if "cannot access local variable" in str(r.message)]
  190. assert not errors, f"Import shadowing error: {capture_logs.format_errors()}"
  191. class TestTimelapseTracking:
  192. """Test timelapse detection during prints."""
  193. @pytest.mark.asyncio
  194. async def test_timelapse_detected_in_same_message_as_print_start(self):
  195. """Verify timelapse is detected when xcam and state come together."""
  196. from backend.app.services.bambu_mqtt import BambuMQTTClient
  197. client = BambuMQTTClient(
  198. ip_address="192.168.1.100",
  199. serial_number="TEST123",
  200. access_code="12345678",
  201. )
  202. client.on_print_start = lambda data: None
  203. # Initial state
  204. client._was_running = False
  205. client._timelapse_during_print = False
  206. # Message with both state and timelapse
  207. client._process_message(
  208. {
  209. "print": {
  210. "gcode_state": "RUNNING",
  211. "gcode_file": "/data/Metadata/test.gcode",
  212. "subtask_name": "Test",
  213. "xcam": {"timelapse": "enable"},
  214. }
  215. }
  216. )
  217. assert client._was_running is True
  218. assert client._timelapse_during_print is True, (
  219. "Timelapse should be detected even when xcam is parsed before state"
  220. )
  221. @pytest.mark.asyncio
  222. async def test_timelapse_flag_included_in_completion_callback(self):
  223. """Verify completion callback receives timelapse_was_active flag."""
  224. from backend.app.services.bambu_mqtt import BambuMQTTClient
  225. client = BambuMQTTClient(
  226. ip_address="192.168.1.100",
  227. serial_number="TEST123",
  228. access_code="12345678",
  229. )
  230. completion_data = {}
  231. def on_complete(data):
  232. completion_data.update(data)
  233. client.on_print_start = lambda data: None
  234. client.on_print_complete = on_complete
  235. # Start with timelapse
  236. client._process_message(
  237. {
  238. "print": {
  239. "gcode_state": "RUNNING",
  240. "gcode_file": "/data/Metadata/test.gcode",
  241. "subtask_name": "Test",
  242. "xcam": {"timelapse": "enable"},
  243. }
  244. }
  245. )
  246. # Complete print
  247. client._process_message(
  248. {
  249. "print": {
  250. "gcode_state": "FINISH",
  251. "gcode_file": "/data/Metadata/test.gcode",
  252. "subtask_name": "Test",
  253. }
  254. }
  255. )
  256. assert "timelapse_was_active" in completion_data
  257. assert completion_data["timelapse_was_active"] is True
  258. @pytest.mark.asyncio
  259. async def test_hms_errors_included_in_failed_completion_callback(self):
  260. """Verify completion callback receives hms_errors for failed prints."""
  261. from backend.app.services.bambu_mqtt import BambuMQTTClient
  262. client = BambuMQTTClient(
  263. ip_address="192.168.1.100",
  264. serial_number="TEST123",
  265. access_code="12345678",
  266. )
  267. completion_data = {}
  268. def on_complete(data):
  269. completion_data.update(data)
  270. client.on_print_start = lambda data: None
  271. client.on_print_complete = on_complete
  272. # Start print
  273. client._process_message(
  274. {
  275. "print": {
  276. "gcode_state": "RUNNING",
  277. "gcode_file": "/data/Metadata/test.gcode",
  278. "subtask_name": "Test",
  279. }
  280. }
  281. )
  282. # Add HMS error during print
  283. client._process_message(
  284. {
  285. "print": {
  286. "gcode_state": "RUNNING",
  287. "hms": [{"attr": 0x07000002, "code": 0x8001}], # Filament module error (code must be >= 0x4000)
  288. }
  289. }
  290. )
  291. # Fail print
  292. client._process_message(
  293. {
  294. "print": {
  295. "gcode_state": "FAILED",
  296. "gcode_file": "/data/Metadata/test.gcode",
  297. "subtask_name": "Test",
  298. }
  299. }
  300. )
  301. assert "hms_errors" in completion_data
  302. assert len(completion_data["hms_errors"]) == 1
  303. assert completion_data["hms_errors"][0]["module"] == 0x07
  304. assert completion_data["status"] == "failed"
  305. @pytest.mark.asyncio
  306. async def test_aborted_status_when_cancelled(self):
  307. """Verify completion callback receives 'aborted' status when print is cancelled."""
  308. from backend.app.services.bambu_mqtt import BambuMQTTClient
  309. client = BambuMQTTClient(
  310. ip_address="192.168.1.100",
  311. serial_number="TEST123",
  312. access_code="12345678",
  313. )
  314. completion_data = {}
  315. def on_complete(data):
  316. completion_data.update(data)
  317. client.on_print_start = lambda data: None
  318. client.on_print_complete = on_complete
  319. # Start print
  320. client._process_message(
  321. {
  322. "print": {
  323. "gcode_state": "RUNNING",
  324. "gcode_file": "/data/Metadata/test.gcode",
  325. "subtask_name": "Test",
  326. }
  327. }
  328. )
  329. # User cancels (goes to IDLE)
  330. client._process_message(
  331. {
  332. "print": {
  333. "gcode_state": "IDLE",
  334. "gcode_file": "/data/Metadata/test.gcode",
  335. "subtask_name": "Test",
  336. }
  337. }
  338. )
  339. assert completion_data["status"] == "aborted"
  340. assert "hms_errors" in completion_data
  341. @pytest.mark.asyncio
  342. async def test_timelapse_detected_from_ipcam_data(self):
  343. """Verify timelapse is detected from ipcam data (H2D sends it there, not xcam)."""
  344. from backend.app.services.bambu_mqtt import BambuMQTTClient
  345. client = BambuMQTTClient(
  346. ip_address="192.168.1.100",
  347. serial_number="TEST123",
  348. access_code="12345678",
  349. )
  350. completion_data = {}
  351. def on_complete(data):
  352. completion_data.update(data)
  353. client.on_print_start = lambda data: None
  354. client.on_print_complete = on_complete
  355. # Start print with timelapse in ipcam data (H2D format)
  356. client._process_message(
  357. {
  358. "print": {
  359. "gcode_state": "RUNNING",
  360. "gcode_file": "/data/Metadata/test.gcode",
  361. "subtask_name": "Test",
  362. "ipcam": {
  363. "ipcam_record": "enable",
  364. "timelapse": "enable",
  365. "resolution": "1080p",
  366. },
  367. }
  368. }
  369. )
  370. assert client._timelapse_during_print is True, "Timelapse should be detected from ipcam data"
  371. # Complete print
  372. client._process_message(
  373. {
  374. "print": {
  375. "gcode_state": "FINISH",
  376. "gcode_file": "/data/Metadata/test.gcode",
  377. "subtask_name": "Test",
  378. }
  379. }
  380. )
  381. assert completion_data["timelapse_was_active"] is True, (
  382. "timelapse_was_active should be True when timelapse was in ipcam"
  383. )
  384. class TestCallbackErrorHandling:
  385. """Test that callback errors are properly logged."""
  386. @pytest.mark.asyncio
  387. async def test_callback_errors_are_logged(self, capture_logs):
  388. """Verify that exceptions in callbacks are logged, not swallowed."""
  389. from backend.app.services.printer_manager import PrinterManager
  390. manager = PrinterManager()
  391. # Set up event loop
  392. loop = asyncio.get_event_loop()
  393. manager.set_event_loop(loop)
  394. # Create a callback that raises an error
  395. error_raised = False
  396. async def failing_callback(printer_id, data):
  397. nonlocal error_raised
  398. error_raised = True
  399. raise ValueError("Test error in callback")
  400. manager.set_print_complete_callback(failing_callback)
  401. # The _schedule_async should log the error
  402. # This is tested indirectly - if exception handling is broken,
  403. # the error would be swallowed silently
  404. class TestNoImportShadowing:
  405. """Verify no import shadowing issues exist in callbacks."""
  406. @pytest.mark.asyncio
  407. async def test_on_print_complete_no_import_errors(self, capture_logs):
  408. """Verify on_print_complete doesn't have import shadowing issues."""
  409. # Import the module to check for syntax/import errors
  410. from backend.app import main
  411. # The ArchiveService should be accessible
  412. from backend.app.services.archive import ArchiveService
  413. # Verify we can instantiate it (would fail with shadowing bug)
  414. assert ArchiveService is not None
  415. # Check logs for any import-related errors
  416. errors = capture_logs.get_errors()
  417. import_errors = [
  418. e for e in errors if "import" in str(e.message).lower() or "local variable" in str(e.message).lower()
  419. ]
  420. assert not import_errors, f"Import errors found: {import_errors}"