test_print_lifecycle.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. """
  2. Integration tests for the full print lifecycle.
  3. These tests verify that:
  4. 1. Print start creates a new archive
  5. 2. Print complete updates archive status
  6. 3. Callbacks are properly executed
  7. 4. Energy tracking works
  8. 5. Notifications are sent
  9. Note: These tests use mocking to avoid database conflicts.
  10. Full end-to-end tests require the actual database setup.
  11. """
  12. import asyncio
  13. from unittest.mock import AsyncMock, MagicMock, patch
  14. import pytest
  15. class TestPrintStartLogic:
  16. """Test print start callback logic without database integration."""
  17. @pytest.mark.asyncio
  18. async def test_print_start_calls_notification_service(self, capture_logs):
  19. """Verify on_print_start triggers notification service."""
  20. with (
  21. patch("backend.app.main.async_session") as mock_session_maker,
  22. patch("backend.app.main.notification_service") as mock_notif,
  23. patch("backend.app.main.smart_plug_manager") as mock_plug,
  24. patch("backend.app.main.ws_manager") as mock_ws,
  25. ):
  26. mock_notif.on_print_start = AsyncMock()
  27. mock_plug.on_print_start = AsyncMock()
  28. mock_ws.send_print_start = AsyncMock()
  29. # Mock the database session
  30. mock_session = AsyncMock()
  31. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  32. mock_session.__aexit__ = AsyncMock()
  33. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  34. mock_session_maker.return_value = mock_session
  35. from backend.app.main import on_print_start
  36. await on_print_start(
  37. 1,
  38. {
  39. "filename": "/data/Metadata/test.gcode",
  40. "subtask_name": "Test",
  41. },
  42. )
  43. # Verify WebSocket notification was sent
  44. mock_ws.send_print_start.assert_called_once()
  45. # Verify no import shadowing errors
  46. errors = [r for r in capture_logs.get_errors() if "cannot access local variable" in str(r.message)]
  47. assert not errors, f"Import shadowing error: {capture_logs.format_errors()}"
  48. class TestPrintCompleteLogic:
  49. """Test print complete callback logic."""
  50. @pytest.mark.asyncio
  51. async def test_print_complete_no_import_errors(self, capture_logs):
  52. """Verify on_print_complete doesn't have import shadowing issues."""
  53. with (
  54. patch("backend.app.main.async_session") as mock_session_maker,
  55. patch("backend.app.main.notification_service") as mock_notif,
  56. patch("backend.app.main.smart_plug_manager") as mock_plug,
  57. patch("backend.app.main.ws_manager") as mock_ws,
  58. ):
  59. mock_notif.on_print_complete = AsyncMock()
  60. mock_plug.on_print_complete = AsyncMock()
  61. mock_ws.send_print_complete = AsyncMock()
  62. # Mock the database session
  63. mock_session = AsyncMock()
  64. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  65. mock_session.__aexit__ = AsyncMock()
  66. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  67. mock_session_maker.return_value = mock_session
  68. from backend.app.main import on_print_complete
  69. await on_print_complete(
  70. 1,
  71. {
  72. "status": "completed",
  73. "filename": "/data/Metadata/test.gcode",
  74. "subtask_name": "Test",
  75. "timelapse_was_active": False,
  76. },
  77. )
  78. # Verify no import shadowing errors - this would have caught the ArchiveService bug
  79. errors = [r for r in capture_logs.get_errors() if "cannot access local variable" in str(r.message)]
  80. assert not errors, f"Import shadowing error: {capture_logs.format_errors()}"
  81. class TestTimelapseTracking:
  82. """Test timelapse detection during prints."""
  83. @pytest.mark.asyncio
  84. async def test_timelapse_detected_in_same_message_as_print_start(self):
  85. """Verify timelapse is detected when xcam and state come together."""
  86. from backend.app.services.bambu_mqtt import BambuMQTTClient
  87. client = BambuMQTTClient(
  88. ip_address="192.168.1.100",
  89. serial_number="TEST123",
  90. access_code="12345678",
  91. )
  92. client.on_print_start = lambda data: None
  93. # Initial state
  94. client._was_running = False
  95. client._timelapse_during_print = False
  96. # Message with both state and timelapse
  97. client._process_message(
  98. {
  99. "print": {
  100. "gcode_state": "RUNNING",
  101. "gcode_file": "/data/Metadata/test.gcode",
  102. "subtask_name": "Test",
  103. "xcam": {"timelapse": "enable"},
  104. }
  105. }
  106. )
  107. assert client._was_running is True
  108. assert client._timelapse_during_print is True, (
  109. "Timelapse should be detected even when xcam is parsed before state"
  110. )
  111. @pytest.mark.asyncio
  112. async def test_timelapse_flag_included_in_completion_callback(self):
  113. """Verify completion callback receives timelapse_was_active flag."""
  114. from backend.app.services.bambu_mqtt import BambuMQTTClient
  115. client = BambuMQTTClient(
  116. ip_address="192.168.1.100",
  117. serial_number="TEST123",
  118. access_code="12345678",
  119. )
  120. completion_data = {}
  121. def on_complete(data):
  122. completion_data.update(data)
  123. client.on_print_start = lambda data: None
  124. client.on_print_complete = on_complete
  125. # Start with timelapse
  126. client._process_message(
  127. {
  128. "print": {
  129. "gcode_state": "RUNNING",
  130. "gcode_file": "/data/Metadata/test.gcode",
  131. "subtask_name": "Test",
  132. "xcam": {"timelapse": "enable"},
  133. }
  134. }
  135. )
  136. # Complete print
  137. client._process_message(
  138. {
  139. "print": {
  140. "gcode_state": "FINISH",
  141. "gcode_file": "/data/Metadata/test.gcode",
  142. "subtask_name": "Test",
  143. }
  144. }
  145. )
  146. assert "timelapse_was_active" in completion_data
  147. assert completion_data["timelapse_was_active"] is True
  148. @pytest.mark.asyncio
  149. async def test_hms_errors_included_in_failed_completion_callback(self):
  150. """Verify completion callback receives hms_errors for failed prints."""
  151. from backend.app.services.bambu_mqtt import BambuMQTTClient
  152. client = BambuMQTTClient(
  153. ip_address="192.168.1.100",
  154. serial_number="TEST123",
  155. access_code="12345678",
  156. )
  157. completion_data = {}
  158. def on_complete(data):
  159. completion_data.update(data)
  160. client.on_print_start = lambda data: None
  161. client.on_print_complete = on_complete
  162. # Start print
  163. client._process_message(
  164. {
  165. "print": {
  166. "gcode_state": "RUNNING",
  167. "gcode_file": "/data/Metadata/test.gcode",
  168. "subtask_name": "Test",
  169. }
  170. }
  171. )
  172. # Add HMS error during print
  173. client._process_message(
  174. {
  175. "print": {
  176. "gcode_state": "RUNNING",
  177. "hms": [{"attr": 0x07000002, "code": 0x1234}], # Filament module error
  178. }
  179. }
  180. )
  181. # Fail print
  182. client._process_message(
  183. {
  184. "print": {
  185. "gcode_state": "FAILED",
  186. "gcode_file": "/data/Metadata/test.gcode",
  187. "subtask_name": "Test",
  188. }
  189. }
  190. )
  191. assert "hms_errors" in completion_data
  192. assert len(completion_data["hms_errors"]) == 1
  193. assert completion_data["hms_errors"][0]["module"] == 0x07
  194. assert completion_data["status"] == "failed"
  195. @pytest.mark.asyncio
  196. async def test_aborted_status_when_cancelled(self):
  197. """Verify completion callback receives 'aborted' status when print is cancelled."""
  198. from backend.app.services.bambu_mqtt import BambuMQTTClient
  199. client = BambuMQTTClient(
  200. ip_address="192.168.1.100",
  201. serial_number="TEST123",
  202. access_code="12345678",
  203. )
  204. completion_data = {}
  205. def on_complete(data):
  206. completion_data.update(data)
  207. client.on_print_start = lambda data: None
  208. client.on_print_complete = on_complete
  209. # Start print
  210. client._process_message(
  211. {
  212. "print": {
  213. "gcode_state": "RUNNING",
  214. "gcode_file": "/data/Metadata/test.gcode",
  215. "subtask_name": "Test",
  216. }
  217. }
  218. )
  219. # User cancels (goes to IDLE)
  220. client._process_message(
  221. {
  222. "print": {
  223. "gcode_state": "IDLE",
  224. "gcode_file": "/data/Metadata/test.gcode",
  225. "subtask_name": "Test",
  226. }
  227. }
  228. )
  229. assert completion_data["status"] == "aborted"
  230. assert "hms_errors" in completion_data
  231. @pytest.mark.asyncio
  232. async def test_timelapse_detected_from_ipcam_data(self):
  233. """Verify timelapse is detected from ipcam data (H2D sends it there, not xcam)."""
  234. from backend.app.services.bambu_mqtt import BambuMQTTClient
  235. client = BambuMQTTClient(
  236. ip_address="192.168.1.100",
  237. serial_number="TEST123",
  238. access_code="12345678",
  239. )
  240. completion_data = {}
  241. def on_complete(data):
  242. completion_data.update(data)
  243. client.on_print_start = lambda data: None
  244. client.on_print_complete = on_complete
  245. # Start print with timelapse in ipcam data (H2D format)
  246. client._process_message(
  247. {
  248. "print": {
  249. "gcode_state": "RUNNING",
  250. "gcode_file": "/data/Metadata/test.gcode",
  251. "subtask_name": "Test",
  252. "ipcam": {
  253. "ipcam_record": "enable",
  254. "timelapse": "enable",
  255. "resolution": "1080p",
  256. },
  257. }
  258. }
  259. )
  260. assert client._timelapse_during_print is True, "Timelapse should be detected from ipcam data"
  261. # Complete print
  262. client._process_message(
  263. {
  264. "print": {
  265. "gcode_state": "FINISH",
  266. "gcode_file": "/data/Metadata/test.gcode",
  267. "subtask_name": "Test",
  268. }
  269. }
  270. )
  271. assert completion_data["timelapse_was_active"] is True, (
  272. "timelapse_was_active should be True when timelapse was in ipcam"
  273. )
  274. class TestCallbackErrorHandling:
  275. """Test that callback errors are properly logged."""
  276. @pytest.mark.asyncio
  277. async def test_callback_errors_are_logged(self, capture_logs):
  278. """Verify that exceptions in callbacks are logged, not swallowed."""
  279. from backend.app.services.printer_manager import PrinterManager
  280. manager = PrinterManager()
  281. # Set up event loop
  282. loop = asyncio.get_event_loop()
  283. manager.set_event_loop(loop)
  284. # Create a callback that raises an error
  285. error_raised = False
  286. async def failing_callback(printer_id, data):
  287. nonlocal error_raised
  288. error_raised = True
  289. raise ValueError("Test error in callback")
  290. manager.set_print_complete_callback(failing_callback)
  291. # The _schedule_async should log the error
  292. # This is tested indirectly - if exception handling is broken,
  293. # the error would be swallowed silently
  294. class TestNoImportShadowing:
  295. """Verify no import shadowing issues exist in callbacks."""
  296. @pytest.mark.asyncio
  297. async def test_on_print_complete_no_import_errors(self, capture_logs):
  298. """Verify on_print_complete doesn't have import shadowing issues."""
  299. # Import the module to check for syntax/import errors
  300. from backend.app import main
  301. # The ArchiveService should be accessible
  302. from backend.app.services.archive import ArchiveService
  303. # Verify we can instantiate it (would fail with shadowing bug)
  304. assert ArchiveService is not None
  305. # Check logs for any import-related errors
  306. errors = capture_logs.get_errors()
  307. import_errors = [
  308. e for e in errors if "import" in str(e.message).lower() or "local variable" in str(e.message).lower()
  309. ]
  310. assert not import_errors, f"Import errors found: {import_errors}"