test_print_lifecycle.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. """
  2. Integration tests for the full print lifecycle.
  3. These tests verify that:
  4. 1. Print start creates a new archive
  5. 2. Print complete updates archive status
  6. 3. Callbacks are properly executed
  7. 4. Energy tracking works
  8. 5. Notifications are sent
  9. Note: These tests use mocking to avoid database conflicts.
  10. Full end-to-end tests require the actual database setup.
  11. """
  12. import asyncio
  13. from datetime import datetime
  14. from unittest.mock import AsyncMock, MagicMock, patch
  15. import pytest
  16. from sqlalchemy import select
  17. class TestPrintStartLogic:
  18. """Test print start callback logic without database integration."""
  19. @pytest.mark.asyncio
  20. async def test_print_start_calls_notification_service(self, capture_logs):
  21. """Verify on_print_start triggers notification service."""
  22. with (
  23. patch("backend.app.main.async_session") as mock_session_maker,
  24. patch("backend.app.main.notification_service") as mock_notif,
  25. patch("backend.app.main.smart_plug_manager") as mock_plug,
  26. patch("backend.app.main.ws_manager") as mock_ws,
  27. ):
  28. mock_notif.on_print_start = AsyncMock()
  29. mock_plug.on_print_start = AsyncMock()
  30. mock_ws.send_print_start = AsyncMock()
  31. # Mock the database session
  32. mock_session = AsyncMock()
  33. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  34. mock_session.__aexit__ = AsyncMock()
  35. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  36. mock_session_maker.return_value = mock_session
  37. from backend.app.main import on_print_start
  38. await on_print_start(
  39. 1,
  40. {
  41. "filename": "/data/Metadata/test.gcode",
  42. "subtask_name": "Test",
  43. },
  44. )
  45. # Verify WebSocket notification was sent
  46. mock_ws.send_print_start.assert_called_once()
  47. # Verify no import shadowing errors
  48. errors = [r for r in capture_logs.get_errors() if "cannot access local variable" in str(r.message)]
  49. assert not errors, f"Import shadowing error: {capture_logs.format_errors()}"
  50. class TestPrintCompleteLogic:
  51. """Test print complete callback logic."""
  52. @pytest.mark.asyncio
  53. async def test_print_complete_no_import_errors(self, capture_logs):
  54. """Verify on_print_complete doesn't have import shadowing issues."""
  55. with (
  56. patch("backend.app.main.async_session") as mock_session_maker,
  57. patch("backend.app.main.notification_service") as mock_notif,
  58. patch("backend.app.main.smart_plug_manager") as mock_plug,
  59. patch("backend.app.main.ws_manager") as mock_ws,
  60. ):
  61. mock_notif.on_print_complete = AsyncMock()
  62. mock_plug.on_print_complete = AsyncMock()
  63. mock_ws.send_print_complete = AsyncMock()
  64. # Mock the database session
  65. mock_session = AsyncMock()
  66. mock_session.__aenter__ = AsyncMock(return_value=mock_session)
  67. mock_session.__aexit__ = AsyncMock()
  68. mock_session.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=None)))
  69. mock_session_maker.return_value = mock_session
  70. from backend.app.main import on_print_complete
  71. await on_print_complete(
  72. 1,
  73. {
  74. "status": "completed",
  75. "filename": "/data/Metadata/test.gcode",
  76. "subtask_name": "Test",
  77. "timelapse_was_active": False,
  78. },
  79. )
  80. # Verify no import shadowing errors - this would have caught the ArchiveService bug
  81. errors = [r for r in capture_logs.get_errors() if "cannot access local variable" in str(r.message)]
  82. assert not errors, f"Import shadowing error: {capture_logs.format_errors()}"
  83. class TestTimelapseTracking:
  84. """Test timelapse detection during prints."""
  85. @pytest.mark.asyncio
  86. async def test_timelapse_detected_in_same_message_as_print_start(self):
  87. """Verify timelapse is detected when xcam and state come together."""
  88. from backend.app.services.bambu_mqtt import BambuMQTTClient
  89. client = BambuMQTTClient(
  90. ip_address="192.168.1.100",
  91. serial_number="TEST123",
  92. access_code="12345678",
  93. )
  94. client.on_print_start = lambda data: None
  95. # Initial state
  96. client._was_running = False
  97. client._timelapse_during_print = False
  98. # Message with both state and timelapse
  99. client._process_message(
  100. {
  101. "print": {
  102. "gcode_state": "RUNNING",
  103. "gcode_file": "/data/Metadata/test.gcode",
  104. "subtask_name": "Test",
  105. "xcam": {"timelapse": "enable"},
  106. }
  107. }
  108. )
  109. assert client._was_running is True
  110. assert client._timelapse_during_print is True, (
  111. "Timelapse should be detected even when xcam is parsed before state"
  112. )
  113. @pytest.mark.asyncio
  114. async def test_timelapse_flag_included_in_completion_callback(self):
  115. """Verify completion callback receives timelapse_was_active flag."""
  116. from backend.app.services.bambu_mqtt import BambuMQTTClient
  117. client = BambuMQTTClient(
  118. ip_address="192.168.1.100",
  119. serial_number="TEST123",
  120. access_code="12345678",
  121. )
  122. completion_data = {}
  123. def on_complete(data):
  124. completion_data.update(data)
  125. client.on_print_start = lambda data: None
  126. client.on_print_complete = on_complete
  127. # Start with timelapse
  128. client._process_message(
  129. {
  130. "print": {
  131. "gcode_state": "RUNNING",
  132. "gcode_file": "/data/Metadata/test.gcode",
  133. "subtask_name": "Test",
  134. "xcam": {"timelapse": "enable"},
  135. }
  136. }
  137. )
  138. # Complete print
  139. client._process_message(
  140. {
  141. "print": {
  142. "gcode_state": "FINISH",
  143. "gcode_file": "/data/Metadata/test.gcode",
  144. "subtask_name": "Test",
  145. }
  146. }
  147. )
  148. assert "timelapse_was_active" in completion_data
  149. assert completion_data["timelapse_was_active"] is True
  150. @pytest.mark.asyncio
  151. async def test_hms_errors_included_in_failed_completion_callback(self):
  152. """Verify completion callback receives hms_errors for failed prints."""
  153. from backend.app.services.bambu_mqtt import BambuMQTTClient
  154. client = BambuMQTTClient(
  155. ip_address="192.168.1.100",
  156. serial_number="TEST123",
  157. access_code="12345678",
  158. )
  159. completion_data = {}
  160. def on_complete(data):
  161. completion_data.update(data)
  162. client.on_print_start = lambda data: None
  163. client.on_print_complete = on_complete
  164. # Start print
  165. client._process_message(
  166. {
  167. "print": {
  168. "gcode_state": "RUNNING",
  169. "gcode_file": "/data/Metadata/test.gcode",
  170. "subtask_name": "Test",
  171. }
  172. }
  173. )
  174. # Add HMS error during print
  175. client._process_message(
  176. {
  177. "print": {
  178. "gcode_state": "RUNNING",
  179. "hms": [{"attr": 0x07000002, "code": 0x1234}], # Filament module error
  180. }
  181. }
  182. )
  183. # Fail print
  184. client._process_message(
  185. {
  186. "print": {
  187. "gcode_state": "FAILED",
  188. "gcode_file": "/data/Metadata/test.gcode",
  189. "subtask_name": "Test",
  190. }
  191. }
  192. )
  193. assert "hms_errors" in completion_data
  194. assert len(completion_data["hms_errors"]) == 1
  195. assert completion_data["hms_errors"][0]["module"] == 0x07
  196. assert completion_data["status"] == "failed"
  197. @pytest.mark.asyncio
  198. async def test_aborted_status_when_cancelled(self):
  199. """Verify completion callback receives 'aborted' status when print is cancelled."""
  200. from backend.app.services.bambu_mqtt import BambuMQTTClient
  201. client = BambuMQTTClient(
  202. ip_address="192.168.1.100",
  203. serial_number="TEST123",
  204. access_code="12345678",
  205. )
  206. completion_data = {}
  207. def on_complete(data):
  208. completion_data.update(data)
  209. client.on_print_start = lambda data: None
  210. client.on_print_complete = on_complete
  211. # Start print
  212. client._process_message(
  213. {
  214. "print": {
  215. "gcode_state": "RUNNING",
  216. "gcode_file": "/data/Metadata/test.gcode",
  217. "subtask_name": "Test",
  218. }
  219. }
  220. )
  221. # User cancels (goes to IDLE)
  222. client._process_message(
  223. {
  224. "print": {
  225. "gcode_state": "IDLE",
  226. "gcode_file": "/data/Metadata/test.gcode",
  227. "subtask_name": "Test",
  228. }
  229. }
  230. )
  231. assert completion_data["status"] == "aborted"
  232. assert "hms_errors" in completion_data
  233. @pytest.mark.asyncio
  234. async def test_timelapse_detected_from_ipcam_data(self):
  235. """Verify timelapse is detected from ipcam data (H2D sends it there, not xcam)."""
  236. from backend.app.services.bambu_mqtt import BambuMQTTClient
  237. client = BambuMQTTClient(
  238. ip_address="192.168.1.100",
  239. serial_number="TEST123",
  240. access_code="12345678",
  241. )
  242. completion_data = {}
  243. def on_complete(data):
  244. completion_data.update(data)
  245. client.on_print_start = lambda data: None
  246. client.on_print_complete = on_complete
  247. # Start print with timelapse in ipcam data (H2D format)
  248. client._process_message(
  249. {
  250. "print": {
  251. "gcode_state": "RUNNING",
  252. "gcode_file": "/data/Metadata/test.gcode",
  253. "subtask_name": "Test",
  254. "ipcam": {
  255. "ipcam_record": "enable",
  256. "timelapse": "enable",
  257. "resolution": "1080p",
  258. },
  259. }
  260. }
  261. )
  262. assert client._timelapse_during_print is True, "Timelapse should be detected from ipcam data"
  263. # Complete print
  264. client._process_message(
  265. {
  266. "print": {
  267. "gcode_state": "FINISH",
  268. "gcode_file": "/data/Metadata/test.gcode",
  269. "subtask_name": "Test",
  270. }
  271. }
  272. )
  273. assert completion_data["timelapse_was_active"] is True, (
  274. "timelapse_was_active should be True when timelapse was in ipcam"
  275. )
  276. class TestCallbackErrorHandling:
  277. """Test that callback errors are properly logged."""
  278. @pytest.mark.asyncio
  279. async def test_callback_errors_are_logged(self, capture_logs):
  280. """Verify that exceptions in callbacks are logged, not swallowed."""
  281. from backend.app.services.printer_manager import PrinterManager
  282. manager = PrinterManager()
  283. # Set up event loop
  284. loop = asyncio.get_event_loop()
  285. manager.set_event_loop(loop)
  286. # Create a callback that raises an error
  287. error_raised = False
  288. async def failing_callback(printer_id, data):
  289. nonlocal error_raised
  290. error_raised = True
  291. raise ValueError("Test error in callback")
  292. manager.set_print_complete_callback(failing_callback)
  293. # The _schedule_async should log the error
  294. # This is tested indirectly - if exception handling is broken,
  295. # the error would be swallowed silently
  296. class TestNoImportShadowing:
  297. """Verify no import shadowing issues exist in callbacks."""
  298. @pytest.mark.asyncio
  299. async def test_on_print_complete_no_import_errors(self, capture_logs):
  300. """Verify on_print_complete doesn't have import shadowing issues."""
  301. # Import the module to check for syntax/import errors
  302. from backend.app import main
  303. # The ArchiveService should be accessible
  304. from backend.app.services.archive import ArchiveService
  305. # Verify we can instantiate it (would fail with shadowing bug)
  306. assert ArchiveService is not None
  307. # Check logs for any import-related errors
  308. errors = capture_logs.get_errors()
  309. import_errors = [
  310. e for e in errors if "import" in str(e.message).lower() or "local variable" in str(e.message).lower()
  311. ]
  312. assert not import_errors, f"Import errors found: {import_errors}"