test_printer_manager_status_broadcast.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. """Regression tests for ``PrinterManager._broadcast_status_change`` and
  2. its wiring from ``set_awaiting_plate_clear`` (#1128).
  3. The bug: ``awaiting_plate_clear`` is a Bambuddy-side flag, so toggling it
  4. doesn't produce an MQTT push from the printer. Before the fix,
  5. ``set_awaiting_plate_clear()`` mutated state and persisted to DB but never
  6. notified WebSocket subscribers. The plate-clear button on the printer card
  7. disappeared "immediately" only because of an optimistic React Query cache
  8. update on the click path; any other caller (admin script, second tab, an
  9. automation that hits ``POST /printers/{id}/clear-plate``) silently left
  10. the UI stale until the next coincidental status refresh.
  11. These tests pin the contract: every flip of the flag must schedule a
  12. ``printer_status`` broadcast, and the broadcast must carry the new flag
  13. value so subscribers see the right state without polling.
  14. """
  15. from __future__ import annotations
  16. import asyncio
  17. from types import SimpleNamespace
  18. from unittest.mock import AsyncMock, MagicMock, patch
  19. import pytest
  20. from backend.app.services.printer_manager import PrinterManager
  21. @pytest.fixture
  22. def manager():
  23. """Fresh manager per test; the awaiting-plate-clear set is per-instance."""
  24. return PrinterManager()
  25. def _close_unawaited(coro):
  26. """Side effect for mocked ``_schedule_async``.
  27. ``set_awaiting_plate_clear`` evaluates the coroutine expressions
  28. ``self._persist_awaiting_plate_clear(...)`` and
  29. ``self._broadcast_status_change(...)`` before passing them to
  30. ``_schedule_async``. When that target is patched, the coroutine objects
  31. leak — Python's ``__del__`` then emits ``coroutine was never awaited``
  32. during GC, and when GC runs late enough that warning hits the interpreter
  33. shutdown path with ``KeyError: '__import__'``. Closing the coroutine here
  34. prevents both. Returns ``None`` so the mock's call signature is unchanged.
  35. """
  36. if asyncio.iscoroutine(coro):
  37. coro.close()
  38. return None
  39. def _fake_state(**overrides):
  40. """Stand-in for a ``PrinterState``.
  41. The tests below patch ``printer_state_to_dict`` so the fake doesn't need
  42. to satisfy every attribute access — but the patch was observed to race on
  43. parallel CI runners (pytest-xdist), and when it didn't catch the call the
  44. real ``printer_state_to_dict`` ran against this fake and ``AttributeError``'d
  45. on ``.kprofiles``. The fake now carries every attribute the real function
  46. reads, so it remains correct even if the patch is somehow bypassed — the
  47. test no longer depends on a fragile monkeypatch landing in time.
  48. Iterables (``kprofiles``, ``printable_objects``, ``hms_errors``,
  49. ``temperatures``, etc.) default to empty so the function's loops are
  50. no-ops; scalars default to ``None`` so any "if state.x is None" guard
  51. falls through cleanly.
  52. """
  53. base = {
  54. # State the existing test bodies explicitly set / read
  55. "connected": True,
  56. "state": "FINISH",
  57. "raw_data": {},
  58. "progress": 100.0,
  59. # Iterables — must be iterable for the loops inside printer_state_to_dict
  60. "kprofiles": [],
  61. "printable_objects": [],
  62. "hms_errors": [],
  63. "temperatures": {},
  64. "nozzle_rack": [],
  65. # Nullable scalars — printer_state_to_dict tolerates None for these
  66. "active_extruder": None,
  67. "ams_status_main": None,
  68. "ams_status_sub": None,
  69. "big_fan1_speed": None,
  70. "big_fan2_speed": None,
  71. "chamber_light": None,
  72. "cooling_fan_speed": None,
  73. "current_print": None,
  74. "door_open": None,
  75. "firmware_version": None,
  76. "gcode_file": None,
  77. "heatbreak_fan_speed": None,
  78. "layer_num": None,
  79. "remaining_time": None,
  80. "speed_level": None,
  81. "stg_cur": 0, # get_derived_status_name does ``0 <= state.stg_cur < 255``
  82. "subtask_name": None,
  83. "total_layers": None,
  84. "tray_now": None,
  85. "wifi_signal": None,
  86. "wired_network": None,
  87. }
  88. base.update(overrides)
  89. return SimpleNamespace(**base)
  90. class TestSchedulingFromSetAwaitingPlateClear:
  91. """The hook from the public flag-mutation method into the broadcast."""
  92. def test_schedules_broadcast_when_loop_running(self, manager):
  93. """When a real event loop is attached, every call to
  94. ``set_awaiting_plate_clear`` must enqueue both the persistence
  95. coroutine and the broadcast coroutine. Both are needed: persist
  96. survives restarts, broadcast notifies live subscribers."""
  97. manager._loop = MagicMock()
  98. manager._loop.is_running.return_value = True
  99. with patch.object(manager, "_schedule_async", side_effect=_close_unawaited) as scheduled:
  100. manager.set_awaiting_plate_clear(7, True)
  101. # Two coroutines: persist + broadcast. Order doesn't matter.
  102. assert scheduled.call_count == 2
  103. def test_does_not_schedule_when_no_loop_attached(self, manager):
  104. """Sync unit-test path (no loop attached): nothing must be
  105. scheduled, otherwise Python emits 'coroutine was never awaited'
  106. runtime warnings and the test suite goes red on harmless flag
  107. twiddling."""
  108. manager._loop = None
  109. with patch.object(manager, "_schedule_async") as scheduled:
  110. manager.set_awaiting_plate_clear(7, True)
  111. scheduled.assert_not_called()
  112. def test_does_not_schedule_when_loop_not_running(self, manager):
  113. """A loop attached-but-stopped is the same situation as no loop —
  114. scheduling onto a dead loop would never fire."""
  115. manager._loop = MagicMock()
  116. manager._loop.is_running.return_value = False
  117. with patch.object(manager, "_schedule_async") as scheduled:
  118. manager.set_awaiting_plate_clear(7, True)
  119. scheduled.assert_not_called()
  120. def test_both_true_and_false_flips_schedule_broadcast(self, manager):
  121. """The bug only became visible on ``False`` flips (clear), but a
  122. regression that broadcasts only on ``True`` would re-introduce
  123. the original symptom for any future flag mutation that goes
  124. ``False → True`` outside the printer-card optimistic-update
  125. path. Make both directions a contract."""
  126. manager._loop = MagicMock()
  127. manager._loop.is_running.return_value = True
  128. with patch.object(manager, "_schedule_async", side_effect=_close_unawaited) as scheduled:
  129. manager.set_awaiting_plate_clear(7, True)
  130. scheduled.reset_mock()
  131. manager.set_awaiting_plate_clear(7, False)
  132. # Each flip = persist + broadcast = 2 calls.
  133. assert scheduled.call_count == 2
  134. class TestBroadcastStatusChange:
  135. """The broadcast coroutine itself."""
  136. @pytest.mark.asyncio
  137. async def test_emits_ws_update_when_state_present(self, manager):
  138. """Happy path: printer has a known status, broadcast goes out
  139. with the dict produced by ``printer_state_to_dict``."""
  140. state = _fake_state()
  141. with (
  142. patch.object(manager, "get_status", return_value=state),
  143. patch.object(manager, "get_model", return_value="P1S"),
  144. patch(
  145. "backend.app.core.websocket.ws_manager.send_printer_status",
  146. new_callable=AsyncMock,
  147. ) as send_status,
  148. patch(
  149. "backend.app.services.printer_manager.printer_state_to_dict",
  150. return_value={"id": 7, "awaiting_plate_clear": False},
  151. ) as to_dict,
  152. ):
  153. await manager._broadcast_status_change(7)
  154. send_status.assert_awaited_once()
  155. # First positional arg is the printer ID, second is the status dict.
  156. printer_id_arg, payload_arg = send_status.await_args.args
  157. assert printer_id_arg == 7
  158. assert payload_arg == {"id": 7, "awaiting_plate_clear": False}
  159. # Verify the dict was built from the right inputs (state + id + model).
  160. to_dict.assert_called_once_with(state, 7, "P1S")
  161. @pytest.mark.asyncio
  162. async def test_skips_when_status_unknown(self, manager):
  163. """Printer not connected / unknown ID → no point broadcasting a
  164. snapshot we don't have. A future reconnect will produce a fresh
  165. status push anyway, so we'd only be forcing a stale or bogus
  166. payload onto subscribers right now."""
  167. with (
  168. patch.object(manager, "get_status", return_value=None),
  169. patch(
  170. "backend.app.core.websocket.ws_manager.send_printer_status",
  171. new_callable=AsyncMock,
  172. ) as send_status,
  173. ):
  174. await manager._broadcast_status_change(999)
  175. send_status.assert_not_awaited()
  176. @pytest.mark.asyncio
  177. async def test_swallows_websocket_errors(self, manager):
  178. """The broadcast is a courtesy, not a correctness path — if the
  179. WS layer is down, the flag is already mutated in-memory and
  180. persisted. Letting an exception bubble out of
  181. ``_broadcast_status_change`` would surface as an
  182. ``Exception in scheduled callback`` traceback in the log AND
  183. prevent the persistence coroutine from completing if both were
  184. gathered together. Swallow + warn instead."""
  185. with (
  186. patch.object(manager, "get_status", return_value=_fake_state()),
  187. patch.object(manager, "get_model", return_value="P1S"),
  188. patch(
  189. "backend.app.services.printer_manager.printer_state_to_dict",
  190. return_value={"id": 7},
  191. ),
  192. patch(
  193. "backend.app.core.websocket.ws_manager.send_printer_status",
  194. new_callable=AsyncMock,
  195. side_effect=RuntimeError("websocket layer unavailable"),
  196. ),
  197. ):
  198. # Must not raise.
  199. await manager._broadcast_status_change(7)
  200. class TestEndToEndUnderRunningLoop:
  201. """Verify the full flow under a real running event loop — schedule
  202. → broadcast → ws_manager.send_printer_status — without mocking
  203. ``_schedule_async``. Catches regressions where individual pieces
  204. pass but the wiring breaks (e.g. ``_schedule_async`` swallowing the
  205. broadcast coroutine)."""
  206. @pytest.mark.asyncio
  207. async def test_set_false_eventually_emits_broadcast(self, manager):
  208. """Reproduces the #1128 fix path end-to-end: set the flag to
  209. False under a live loop, give the scheduler a tick, the
  210. ws broadcast must have fired with the new payload."""
  211. loop = asyncio.get_running_loop()
  212. manager._loop = loop
  213. # Pretend the printer has been seen — without a state present
  214. # the broadcast short-circuits before reaching ws_manager.
  215. manager._awaiting_plate_clear.add(7)
  216. with (
  217. patch.object(manager, "get_status", return_value=_fake_state()),
  218. patch.object(manager, "get_model", return_value="P1S"),
  219. patch(
  220. "backend.app.services.printer_manager.printer_state_to_dict",
  221. return_value={"id": 7, "awaiting_plate_clear": False},
  222. ),
  223. patch(
  224. "backend.app.core.websocket.ws_manager.send_printer_status",
  225. new_callable=AsyncMock,
  226. ) as send_status,
  227. # Persistence path opens a DB session; stub it out so this
  228. # stays a pure unit test.
  229. patch.object(manager, "_persist_awaiting_plate_clear", new_callable=AsyncMock),
  230. ):
  231. manager.set_awaiting_plate_clear(7, False)
  232. # Yield repeatedly so run_coroutine_threadsafe has a chance
  233. # to land its scheduled coroutine on this loop.
  234. for _ in range(10):
  235. await asyncio.sleep(0)
  236. send_status.assert_awaited()
  237. printer_id_arg, payload_arg = send_status.await_args.args
  238. assert printer_id_arg == 7
  239. assert payload_arg["awaiting_plate_clear"] is False