test_printer_manager_status_broadcast.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. """Regression tests for ``PrinterManager._broadcast_status_change`` and
  2. its wiring from ``set_awaiting_plate_clear`` (#1128).
  3. The bug: ``awaiting_plate_clear`` is a Bambuddy-side flag, so toggling it
  4. doesn't produce an MQTT push from the printer. Before the fix,
  5. ``set_awaiting_plate_clear()`` mutated state and persisted to DB but never
  6. notified WebSocket subscribers. The plate-clear button on the printer card
  7. disappeared "immediately" only because of an optimistic React Query cache
  8. update on the click path; any other caller (admin script, second tab, an
  9. automation that hits ``POST /printers/{id}/clear-plate``) silently left
  10. the UI stale until the next coincidental status refresh.
  11. These tests pin the contract: every flip of the flag must schedule a
  12. ``printer_status`` broadcast, and the broadcast must carry the new flag
  13. value so subscribers see the right state without polling.
  14. """
  15. from __future__ import annotations
  16. import asyncio
  17. from types import SimpleNamespace
  18. from unittest.mock import AsyncMock, MagicMock, patch
  19. import pytest
  20. from backend.app.services.printer_manager import PrinterManager
  21. @pytest.fixture
  22. def manager():
  23. """Fresh manager per test; the awaiting-plate-clear set is per-instance."""
  24. return PrinterManager()
  25. def _fake_state(**overrides):
  26. """Minimal stand-in for a ``PrinterState`` — only the attributes
  27. ``printer_state_to_dict`` reads. We use a SimpleNamespace rather than
  28. constructing a real PrinterState so this test stays fast and doesn't
  29. couple to the (large, evolving) PrinterState dataclass shape."""
  30. base = {
  31. "connected": True,
  32. "state": "FINISH",
  33. "raw_data": {},
  34. "progress": 100.0,
  35. }
  36. base.update(overrides)
  37. return SimpleNamespace(**base)
  38. class TestSchedulingFromSetAwaitingPlateClear:
  39. """The hook from the public flag-mutation method into the broadcast."""
  40. def test_schedules_broadcast_when_loop_running(self, manager):
  41. """When a real event loop is attached, every call to
  42. ``set_awaiting_plate_clear`` must enqueue both the persistence
  43. coroutine and the broadcast coroutine. Both are needed: persist
  44. survives restarts, broadcast notifies live subscribers."""
  45. manager._loop = MagicMock()
  46. manager._loop.is_running.return_value = True
  47. with patch.object(manager, "_schedule_async") as scheduled:
  48. manager.set_awaiting_plate_clear(7, True)
  49. # Two coroutines: persist + broadcast. Order doesn't matter.
  50. assert scheduled.call_count == 2
  51. def test_does_not_schedule_when_no_loop_attached(self, manager):
  52. """Sync unit-test path (no loop attached): nothing must be
  53. scheduled, otherwise Python emits 'coroutine was never awaited'
  54. runtime warnings and the test suite goes red on harmless flag
  55. twiddling."""
  56. manager._loop = None
  57. with patch.object(manager, "_schedule_async") as scheduled:
  58. manager.set_awaiting_plate_clear(7, True)
  59. scheduled.assert_not_called()
  60. def test_does_not_schedule_when_loop_not_running(self, manager):
  61. """A loop attached-but-stopped is the same situation as no loop —
  62. scheduling onto a dead loop would never fire."""
  63. manager._loop = MagicMock()
  64. manager._loop.is_running.return_value = False
  65. with patch.object(manager, "_schedule_async") as scheduled:
  66. manager.set_awaiting_plate_clear(7, True)
  67. scheduled.assert_not_called()
  68. def test_both_true_and_false_flips_schedule_broadcast(self, manager):
  69. """The bug only became visible on ``False`` flips (clear), but a
  70. regression that broadcasts only on ``True`` would re-introduce
  71. the original symptom for any future flag mutation that goes
  72. ``False → True`` outside the printer-card optimistic-update
  73. path. Make both directions a contract."""
  74. manager._loop = MagicMock()
  75. manager._loop.is_running.return_value = True
  76. with patch.object(manager, "_schedule_async") as scheduled:
  77. manager.set_awaiting_plate_clear(7, True)
  78. scheduled.reset_mock()
  79. manager.set_awaiting_plate_clear(7, False)
  80. # Each flip = persist + broadcast = 2 calls.
  81. assert scheduled.call_count == 2
  82. class TestBroadcastStatusChange:
  83. """The broadcast coroutine itself."""
  84. @pytest.mark.asyncio
  85. async def test_emits_ws_update_when_state_present(self, manager):
  86. """Happy path: printer has a known status, broadcast goes out
  87. with the dict produced by ``printer_state_to_dict``."""
  88. state = _fake_state()
  89. with (
  90. patch.object(manager, "get_status", return_value=state),
  91. patch.object(manager, "get_model", return_value="P1S"),
  92. patch(
  93. "backend.app.core.websocket.ws_manager.send_printer_status",
  94. new_callable=AsyncMock,
  95. ) as send_status,
  96. patch(
  97. "backend.app.services.printer_manager.printer_state_to_dict",
  98. return_value={"id": 7, "awaiting_plate_clear": False},
  99. ) as to_dict,
  100. ):
  101. await manager._broadcast_status_change(7)
  102. send_status.assert_awaited_once()
  103. # First positional arg is the printer ID, second is the status dict.
  104. printer_id_arg, payload_arg = send_status.await_args.args
  105. assert printer_id_arg == 7
  106. assert payload_arg == {"id": 7, "awaiting_plate_clear": False}
  107. # Verify the dict was built from the right inputs (state + id + model).
  108. to_dict.assert_called_once_with(state, 7, "P1S")
  109. @pytest.mark.asyncio
  110. async def test_skips_when_status_unknown(self, manager):
  111. """Printer not connected / unknown ID → no point broadcasting a
  112. snapshot we don't have. A future reconnect will produce a fresh
  113. status push anyway, so we'd only be forcing a stale or bogus
  114. payload onto subscribers right now."""
  115. with (
  116. patch.object(manager, "get_status", return_value=None),
  117. patch(
  118. "backend.app.core.websocket.ws_manager.send_printer_status",
  119. new_callable=AsyncMock,
  120. ) as send_status,
  121. ):
  122. await manager._broadcast_status_change(999)
  123. send_status.assert_not_awaited()
  124. @pytest.mark.asyncio
  125. async def test_swallows_websocket_errors(self, manager):
  126. """The broadcast is a courtesy, not a correctness path — if the
  127. WS layer is down, the flag is already mutated in-memory and
  128. persisted. Letting an exception bubble out of
  129. ``_broadcast_status_change`` would surface as an
  130. ``Exception in scheduled callback`` traceback in the log AND
  131. prevent the persistence coroutine from completing if both were
  132. gathered together. Swallow + warn instead."""
  133. with (
  134. patch.object(manager, "get_status", return_value=_fake_state()),
  135. patch.object(manager, "get_model", return_value="P1S"),
  136. patch(
  137. "backend.app.services.printer_manager.printer_state_to_dict",
  138. return_value={"id": 7},
  139. ),
  140. patch(
  141. "backend.app.core.websocket.ws_manager.send_printer_status",
  142. new_callable=AsyncMock,
  143. side_effect=RuntimeError("websocket layer unavailable"),
  144. ),
  145. ):
  146. # Must not raise.
  147. await manager._broadcast_status_change(7)
  148. class TestEndToEndUnderRunningLoop:
  149. """Verify the full flow under a real running event loop — schedule
  150. → broadcast → ws_manager.send_printer_status — without mocking
  151. ``_schedule_async``. Catches regressions where individual pieces
  152. pass but the wiring breaks (e.g. ``_schedule_async`` swallowing the
  153. broadcast coroutine)."""
  154. @pytest.mark.asyncio
  155. async def test_set_false_eventually_emits_broadcast(self, manager):
  156. """Reproduces the #1128 fix path end-to-end: set the flag to
  157. False under a live loop, give the scheduler a tick, the
  158. ws broadcast must have fired with the new payload."""
  159. loop = asyncio.get_running_loop()
  160. manager._loop = loop
  161. # Pretend the printer has been seen — without a state present
  162. # the broadcast short-circuits before reaching ws_manager.
  163. manager._awaiting_plate_clear.add(7)
  164. with (
  165. patch.object(manager, "get_status", return_value=_fake_state()),
  166. patch.object(manager, "get_model", return_value="P1S"),
  167. patch(
  168. "backend.app.services.printer_manager.printer_state_to_dict",
  169. return_value={"id": 7, "awaiting_plate_clear": False},
  170. ),
  171. patch(
  172. "backend.app.core.websocket.ws_manager.send_printer_status",
  173. new_callable=AsyncMock,
  174. ) as send_status,
  175. # Persistence path opens a DB session; stub it out so this
  176. # stays a pure unit test.
  177. patch.object(manager, "_persist_awaiting_plate_clear", new_callable=AsyncMock),
  178. ):
  179. manager.set_awaiting_plate_clear(7, False)
  180. # Yield repeatedly so run_coroutine_threadsafe has a chance
  181. # to land its scheduled coroutine on this loop.
  182. for _ in range(10):
  183. await asyncio.sleep(0)
  184. send_status.assert_awaited()
  185. printer_id_arg, payload_arg = send_status.await_args.args
  186. assert printer_id_arg == 7
  187. assert payload_arg["awaiting_plate_clear"] is False