test_print_lifecycle.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. """
  2. Integration tests for the full print lifecycle.
  3. These tests verify that:
  4. 1. Print start creates a new archive
  5. 2. Print complete updates archive status
  6. 3. Callbacks are properly executed
  7. 4. Energy tracking works
  8. 5. Notifications are sent
  9. Note: These tests use mocking to avoid database conflicts.
  10. Full end-to-end tests require the actual database setup.
  11. """
  12. import asyncio
  13. from unittest.mock import AsyncMock, MagicMock, patch
  14. import pytest
  15. class TestPrintStartLogic:
  16. """Test print start callback logic without database integration."""
  17. @pytest.mark.asyncio
  18. async def test_print_start_calls_notification_service(self, capture_logs):
  19. """Verify on_print_start triggers notification service."""
  20. with (
  21. patch("backend.app.main.async_session") as mock_session_maker,
  22. patch("backend.app.main.notification_service") as mock_notif,
  23. patch("backend.app.main.smart_plug_manager") as mock_plug,
  24. patch("backend.app.main.ws_manager") as mock_ws,
  25. ):
  26. mock_notif.on_print_start = AsyncMock()
  27. mock_plug.on_print_start = AsyncMock()
  28. mock_ws.send_print_start = AsyncMock()
  29. # Mock the database session
  30. mock_session = AsyncMock()
  31. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  32. mock_session.__aexit__ = AsyncMock()
  33. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  34. mock_session_maker.return_value = mock_session
  35. from backend.app.main import on_print_start
  36. await on_print_start(
  37. 1,
  38. {
  39. "filename": "/data/Metadata/test.gcode",
  40. "subtask_name": "Test",
  41. },
  42. )
  43. # Verify WebSocket notification was sent
  44. mock_ws.send_print_start.assert_called_once()
  45. # Verify no import shadowing errors
  46. errors = [r for r in capture_logs.get_errors() if "cannot access local variable" in str(r.message)]
  47. assert not errors, f"Import shadowing error: {capture_logs.format_errors()}"
  48. class TestPrintCompleteLogic:
  49. """Test print complete callback logic."""
  50. @pytest.mark.asyncio
  51. async def test_print_complete_no_import_errors(self, capture_logs):
  52. """Verify on_print_complete doesn't have import shadowing issues."""
  53. # Snapshot tasks before the call so we can cancel orphans afterwards.
  54. # on_print_complete fires background tasks (maintenance check, notifications,
  55. # smart-plug) via asyncio.create_task. If those tasks outlive the mock
  56. # context they use the *real* async_session and can send real notifications.
  57. tasks_before = set(asyncio.all_tasks())
  58. with (
  59. patch("backend.app.main.async_session") as mock_session_maker,
  60. patch("backend.app.main.notification_service") as mock_notif,
  61. patch("backend.app.main.smart_plug_manager") as mock_plug,
  62. patch("backend.app.main.ws_manager") as mock_ws,
  63. patch("backend.app.main.mqtt_relay") as mock_relay,
  64. patch("backend.app.main.printer_manager") as mock_pm,
  65. ):
  66. mock_notif.on_print_complete = AsyncMock()
  67. mock_plug.on_print_complete = AsyncMock()
  68. mock_ws.send_print_complete = AsyncMock()
  69. mock_ws.broadcast = AsyncMock()
  70. mock_relay.on_print_complete = AsyncMock()
  71. mock_pm.get_printer.return_value = None
  72. # Mock the database session
  73. mock_session = AsyncMock()
  74. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  75. mock_session.__aexit__ = AsyncMock()
  76. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  77. mock_session_maker.return_value = mock_session
  78. from backend.app.main import on_print_complete
  79. await on_print_complete(
  80. 1,
  81. {
  82. "status": "completed",
  83. "filename": "/data/Metadata/test.gcode",
  84. "subtask_name": "Test",
  85. "timelapse_was_active": False,
  86. },
  87. )
  88. # Cancel background tasks spawned by on_print_complete before
  89. # leaving the mock context — prevents them from running with
  90. # the real async_session and sending real notifications.
  91. for task in asyncio.all_tasks() - tasks_before:
  92. task.cancel()
  93. try:
  94. await task
  95. except (asyncio.CancelledError, Exception):
  96. pass
  97. # Verify no import shadowing errors - this would have caught the ArchiveService bug
  98. errors = [r for r in capture_logs.get_errors() if "cannot access local variable" in str(r.message)]
  99. assert not errors, f"Import shadowing error: {capture_logs.format_errors()}"
  100. class TestTimelapseTracking:
  101. """Test timelapse detection during prints."""
  102. @pytest.mark.asyncio
  103. async def test_timelapse_detected_in_same_message_as_print_start(self):
  104. """Verify timelapse is detected when xcam and state come together."""
  105. from backend.app.services.bambu_mqtt import BambuMQTTClient
  106. client = BambuMQTTClient(
  107. ip_address="192.168.1.100",
  108. serial_number="TEST123",
  109. access_code="12345678",
  110. )
  111. client.on_print_start = lambda data: None
  112. # Initial state
  113. client._was_running = False
  114. client._timelapse_during_print = False
  115. # Message with both state and timelapse
  116. client._process_message(
  117. {
  118. "print": {
  119. "gcode_state": "RUNNING",
  120. "gcode_file": "/data/Metadata/test.gcode",
  121. "subtask_name": "Test",
  122. "xcam": {"timelapse": "enable"},
  123. }
  124. }
  125. )
  126. assert client._was_running is True
  127. assert client._timelapse_during_print is True, (
  128. "Timelapse should be detected even when xcam is parsed before state"
  129. )
  130. @pytest.mark.asyncio
  131. async def test_timelapse_flag_included_in_completion_callback(self):
  132. """Verify completion callback receives timelapse_was_active flag."""
  133. from backend.app.services.bambu_mqtt import BambuMQTTClient
  134. client = BambuMQTTClient(
  135. ip_address="192.168.1.100",
  136. serial_number="TEST123",
  137. access_code="12345678",
  138. )
  139. completion_data = {}
  140. def on_complete(data):
  141. completion_data.update(data)
  142. client.on_print_start = lambda data: None
  143. client.on_print_complete = on_complete
  144. # Start with timelapse
  145. client._process_message(
  146. {
  147. "print": {
  148. "gcode_state": "RUNNING",
  149. "gcode_file": "/data/Metadata/test.gcode",
  150. "subtask_name": "Test",
  151. "xcam": {"timelapse": "enable"},
  152. }
  153. }
  154. )
  155. # Complete print
  156. client._process_message(
  157. {
  158. "print": {
  159. "gcode_state": "FINISH",
  160. "gcode_file": "/data/Metadata/test.gcode",
  161. "subtask_name": "Test",
  162. }
  163. }
  164. )
  165. assert "timelapse_was_active" in completion_data
  166. assert completion_data["timelapse_was_active"] is True
  167. @pytest.mark.asyncio
  168. async def test_hms_errors_included_in_failed_completion_callback(self):
  169. """Verify completion callback receives hms_errors for failed prints."""
  170. from backend.app.services.bambu_mqtt import BambuMQTTClient
  171. client = BambuMQTTClient(
  172. ip_address="192.168.1.100",
  173. serial_number="TEST123",
  174. access_code="12345678",
  175. )
  176. completion_data = {}
  177. def on_complete(data):
  178. completion_data.update(data)
  179. client.on_print_start = lambda data: None
  180. client.on_print_complete = on_complete
  181. # Start print
  182. client._process_message(
  183. {
  184. "print": {
  185. "gcode_state": "RUNNING",
  186. "gcode_file": "/data/Metadata/test.gcode",
  187. "subtask_name": "Test",
  188. }
  189. }
  190. )
  191. # Add HMS error during print
  192. client._process_message(
  193. {
  194. "print": {
  195. "gcode_state": "RUNNING",
  196. "hms": [{"attr": 0x07000002, "code": 0x1234}], # Filament module error
  197. }
  198. }
  199. )
  200. # Fail print
  201. client._process_message(
  202. {
  203. "print": {
  204. "gcode_state": "FAILED",
  205. "gcode_file": "/data/Metadata/test.gcode",
  206. "subtask_name": "Test",
  207. }
  208. }
  209. )
  210. assert "hms_errors" in completion_data
  211. assert len(completion_data["hms_errors"]) == 1
  212. assert completion_data["hms_errors"][0]["module"] == 0x07
  213. assert completion_data["status"] == "failed"
  214. @pytest.mark.asyncio
  215. async def test_aborted_status_when_cancelled(self):
  216. """Verify completion callback receives 'aborted' status when print is cancelled."""
  217. from backend.app.services.bambu_mqtt import BambuMQTTClient
  218. client = BambuMQTTClient(
  219. ip_address="192.168.1.100",
  220. serial_number="TEST123",
  221. access_code="12345678",
  222. )
  223. completion_data = {}
  224. def on_complete(data):
  225. completion_data.update(data)
  226. client.on_print_start = lambda data: None
  227. client.on_print_complete = on_complete
  228. # Start print
  229. client._process_message(
  230. {
  231. "print": {
  232. "gcode_state": "RUNNING",
  233. "gcode_file": "/data/Metadata/test.gcode",
  234. "subtask_name": "Test",
  235. }
  236. }
  237. )
  238. # User cancels (goes to IDLE)
  239. client._process_message(
  240. {
  241. "print": {
  242. "gcode_state": "IDLE",
  243. "gcode_file": "/data/Metadata/test.gcode",
  244. "subtask_name": "Test",
  245. }
  246. }
  247. )
  248. assert completion_data["status"] == "aborted"
  249. assert "hms_errors" in completion_data
  250. @pytest.mark.asyncio
  251. async def test_timelapse_detected_from_ipcam_data(self):
  252. """Verify timelapse is detected from ipcam data (H2D sends it there, not xcam)."""
  253. from backend.app.services.bambu_mqtt import BambuMQTTClient
  254. client = BambuMQTTClient(
  255. ip_address="192.168.1.100",
  256. serial_number="TEST123",
  257. access_code="12345678",
  258. )
  259. completion_data = {}
  260. def on_complete(data):
  261. completion_data.update(data)
  262. client.on_print_start = lambda data: None
  263. client.on_print_complete = on_complete
  264. # Start print with timelapse in ipcam data (H2D format)
  265. client._process_message(
  266. {
  267. "print": {
  268. "gcode_state": "RUNNING",
  269. "gcode_file": "/data/Metadata/test.gcode",
  270. "subtask_name": "Test",
  271. "ipcam": {
  272. "ipcam_record": "enable",
  273. "timelapse": "enable",
  274. "resolution": "1080p",
  275. },
  276. }
  277. }
  278. )
  279. assert client._timelapse_during_print is True, "Timelapse should be detected from ipcam data"
  280. # Complete print
  281. client._process_message(
  282. {
  283. "print": {
  284. "gcode_state": "FINISH",
  285. "gcode_file": "/data/Metadata/test.gcode",
  286. "subtask_name": "Test",
  287. }
  288. }
  289. )
  290. assert completion_data["timelapse_was_active"] is True, (
  291. "timelapse_was_active should be True when timelapse was in ipcam"
  292. )
  293. class TestCallbackErrorHandling:
  294. """Test that callback errors are properly logged."""
  295. @pytest.mark.asyncio
  296. async def test_callback_errors_are_logged(self, capture_logs):
  297. """Verify that exceptions in callbacks are logged, not swallowed."""
  298. from backend.app.services.printer_manager import PrinterManager
  299. manager = PrinterManager()
  300. # Set up event loop
  301. loop = asyncio.get_event_loop()
  302. manager.set_event_loop(loop)
  303. # Create a callback that raises an error
  304. error_raised = False
  305. async def failing_callback(printer_id, data):
  306. nonlocal error_raised
  307. error_raised = True
  308. raise ValueError("Test error in callback")
  309. manager.set_print_complete_callback(failing_callback)
  310. # The _schedule_async should log the error
  311. # This is tested indirectly - if exception handling is broken,
  312. # the error would be swallowed silently
  313. class TestNoImportShadowing:
  314. """Verify no import shadowing issues exist in callbacks."""
  315. @pytest.mark.asyncio
  316. async def test_on_print_complete_no_import_errors(self, capture_logs):
  317. """Verify on_print_complete doesn't have import shadowing issues."""
  318. # Import the module to check for syntax/import errors
  319. from backend.app import main
  320. # The ArchiveService should be accessible
  321. from backend.app.services.archive import ArchiveService
  322. # Verify we can instantiate it (would fail with shadowing bug)
  323. assert ArchiveService is not None
  324. # Check logs for any import-related errors
  325. errors = capture_logs.get_errors()
  326. import_errors = [
  327. e for e in errors if "import" in str(e.message).lower() or "local variable" in str(e.message).lower()
  328. ]
  329. assert not import_errors, f"Import errors found: {import_errors}"