test_camera_fanout.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. """Unit tests for the MJPEG fan-out broadcaster (#1089).
  2. These tests do not touch ffmpeg or any printer — they drive a fake upstream
  3. generator and assert subscriber/pump lifecycle behaviour.
  4. """
  5. from __future__ import annotations
  6. import asyncio
  7. from collections.abc import AsyncGenerator
  8. import pytest
  9. from backend.app.services import camera_fanout
  10. from backend.app.services.camera_fanout import (
  11. MjpegBroadcaster,
  12. get_or_create_broadcaster,
  13. iter_subscriber,
  14. shutdown_all_broadcasters,
  15. shutdown_broadcaster,
  16. )
  17. pytestmark = pytest.mark.asyncio
  18. # Speed up grace-window tests so the suite stays fast. The default 5s grace
  19. # is overkill for unit tests; we patch it down to a few ms.
  20. @pytest.fixture(autouse=True)
  21. def _short_grace(monkeypatch):
  22. monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.05)
  23. @pytest.fixture(autouse=True)
  24. async def _clean_registry():
  25. """Reset the global broadcaster registry between tests."""
  26. await shutdown_all_broadcasters()
  27. yield
  28. await shutdown_all_broadcasters()
  29. def _make_factory(
  30. chunks: list[bytes],
  31. *,
  32. delay: float = 0.0,
  33. pump_started: asyncio.Event | None = None,
  34. pump_count: list[int] | None = None,
  35. ):
  36. """Build an upstream factory that yields a fixed list of chunks."""
  37. async def factory(disconnect: asyncio.Event) -> AsyncGenerator[bytes, None]:
  38. if pump_started is not None:
  39. pump_started.set()
  40. if pump_count is not None:
  41. pump_count[0] += 1
  42. for chunk in chunks:
  43. if disconnect.is_set():
  44. return
  45. if delay:
  46. try:
  47. await asyncio.wait_for(disconnect.wait(), timeout=delay)
  48. return
  49. except asyncio.TimeoutError:
  50. pass
  51. yield chunk
  52. return factory
  53. # ---------------------------------------------------------------------------
  54. # Single subscriber
  55. # ---------------------------------------------------------------------------
  56. async def test_single_subscriber_receives_all_frames():
  57. bc = MjpegBroadcaster("p1", _make_factory([b"a", b"b", b"c"], delay=0.005))
  58. queue = await bc.subscribe()
  59. received = []
  60. for _ in range(3):
  61. received.append(await asyncio.wait_for(queue.get(), timeout=1.0))
  62. assert received == [b"a", b"b", b"c"]
  63. await bc.force_shutdown()
  64. # ---------------------------------------------------------------------------
  65. # Multiple subscribers share one upstream
  66. # ---------------------------------------------------------------------------
  67. async def test_multiple_subscribers_share_single_upstream():
  68. pump_count = [0]
  69. bc = MjpegBroadcaster(
  70. "p1",
  71. _make_factory([b"f1", b"f2", b"f3"], delay=0.01, pump_count=pump_count),
  72. )
  73. q1 = await bc.subscribe()
  74. q2 = await bc.subscribe()
  75. q3 = await bc.subscribe()
  76. # Each subscriber must receive each frame exactly once.
  77. for q in (q1, q2, q3):
  78. received = []
  79. for _ in range(3):
  80. received.append(await asyncio.wait_for(q.get(), timeout=1.0))
  81. assert received == [b"f1", b"f2", b"f3"]
  82. # Only ONE upstream pump ever ran — that is the entire point of the bug fix.
  83. assert pump_count[0] == 1
  84. await bc.force_shutdown()
  85. # ---------------------------------------------------------------------------
  86. # Slow subscriber should not block fast subscribers
  87. # ---------------------------------------------------------------------------
  88. async def test_slow_subscriber_does_not_block_others():
  89. # Generate more frames than the queue depth so a non-draining queue is
  90. # guaranteed to fill up.
  91. chunks = [bytes([i % 256]) for i in range(50)]
  92. bc = MjpegBroadcaster("p1", _make_factory(chunks, delay=0.001))
  93. slow = await bc.subscribe()
  94. fast = await bc.subscribe()
  95. # Drain `fast` quickly; never read from `slow`. The fast subscriber must
  96. # still get every frame even though `slow` is wedged.
  97. received_fast = []
  98. for _ in range(50):
  99. received_fast.append(await asyncio.wait_for(fast.get(), timeout=2.0))
  100. assert received_fast == chunks
  101. # Slow subscriber's queue should be at most _SUBSCRIBER_QUEUE_SIZE — older
  102. # frames were dropped, not stuffed indefinitely.
  103. assert slow.qsize() <= camera_fanout._SUBSCRIBER_QUEUE_SIZE
  104. await bc.force_shutdown()
  105. # ---------------------------------------------------------------------------
  106. # Last-subscriber-leaves grace window
  107. # ---------------------------------------------------------------------------
  108. async def test_pump_torn_down_after_last_subscriber_leaves(monkeypatch):
  109. monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.05)
  110. pump_count = [0]
  111. # Long upstream so we know it's still running until disconnect signals it.
  112. bc = MjpegBroadcaster(
  113. "p1",
  114. _make_factory([b"x"] * 1000, delay=0.05, pump_count=pump_count),
  115. )
  116. queue = await bc.subscribe()
  117. # Read a couple of frames.
  118. await asyncio.wait_for(queue.get(), timeout=1.0)
  119. await bc.unsubscribe(queue)
  120. # Wait for grace window to elapse + a hair more.
  121. await asyncio.sleep(0.2)
  122. assert bc.subscriber_count == 0
  123. assert bc.stopped is True
  124. assert pump_count[0] == 1
  125. async def test_grace_window_cancelled_on_rejoin(monkeypatch):
  126. monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.1)
  127. pump_count = [0]
  128. bc = MjpegBroadcaster(
  129. "p1",
  130. _make_factory([b"x"] * 1000, delay=0.02, pump_count=pump_count),
  131. )
  132. q1 = await bc.subscribe()
  133. await asyncio.wait_for(q1.get(), timeout=1.0)
  134. await bc.unsubscribe(q1)
  135. # Rejoin BEFORE grace expires — pump should keep running.
  136. await asyncio.sleep(0.02)
  137. q2 = await bc.subscribe()
  138. # Settle past the original grace deadline.
  139. await asyncio.sleep(0.2)
  140. # Pump still alive, only one upstream connection ever opened.
  141. assert bc.stopped is False
  142. assert pump_count[0] == 1
  143. # And the second subscriber is still receiving frames.
  144. await asyncio.wait_for(q2.get(), timeout=1.0)
  145. await bc.force_shutdown()
  146. # ---------------------------------------------------------------------------
  147. # Force shutdown wakes subscribers
  148. # ---------------------------------------------------------------------------
  149. async def test_force_shutdown_signals_subscribers():
  150. bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
  151. queue = await bc.subscribe()
  152. await asyncio.wait_for(queue.get(), timeout=1.0)
  153. await bc.force_shutdown()
  154. # Subscriber's queue should contain the upstream-gone sentinel (or be
  155. # drained); either way a get() must complete promptly.
  156. sentinel = await asyncio.wait_for(queue.get(), timeout=1.0)
  157. assert sentinel == camera_fanout._UPSTREAM_GONE
  158. assert bc.stopped is True
  159. # ---------------------------------------------------------------------------
  160. # iter_subscriber helper exits cleanly on upstream-gone and disconnect
  161. # ---------------------------------------------------------------------------
  162. async def test_iter_subscriber_exits_on_upstream_gone():
  163. bc = MjpegBroadcaster("p1", _make_factory([b"a", b"b"], delay=0.005))
  164. queue = await bc.subscribe()
  165. received = []
  166. async for chunk in iter_subscriber(bc, queue):
  167. received.append(chunk)
  168. # Pump exited after yielding two chunks; iter_subscriber must return.
  169. assert received == [b"a", b"b"]
  170. # Helper unsubscribed us on the way out.
  171. assert bc.subscriber_count == 0
  172. async def test_iter_subscriber_exits_on_client_disconnect():
  173. bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.02))
  174. queue = await bc.subscribe()
  175. seen = 0
  176. async def is_disconnected() -> bool:
  177. return seen >= 2 # Pretend the client left after 2 frames.
  178. async for _chunk in iter_subscriber(bc, queue, is_disconnected=is_disconnected):
  179. seen += 1
  180. if seen >= 5: # Defensive cap so a buggy iterator can't run forever.
  181. break
  182. assert seen == 2
  183. assert bc.subscriber_count == 0
  184. await bc.force_shutdown()
  185. # ---------------------------------------------------------------------------
  186. # Registry: stopped broadcasters get replaced
  187. # ---------------------------------------------------------------------------
  188. async def test_registry_replaces_stopped_broadcaster():
  189. factory_a = _make_factory([b"a"] * 1000, delay=0.02)
  190. factory_b = _make_factory([b"b"] * 1000, delay=0.02)
  191. bc1 = await get_or_create_broadcaster("p1", factory_a)
  192. q1 = await bc1.subscribe()
  193. await asyncio.wait_for(q1.get(), timeout=1.0)
  194. await shutdown_broadcaster("p1")
  195. assert bc1.stopped is True
  196. # New subscription must get a fresh broadcaster.
  197. bc2 = await get_or_create_broadcaster("p1", factory_b)
  198. assert bc2 is not bc1
  199. q2 = await bc2.subscribe()
  200. chunk = await asyncio.wait_for(q2.get(), timeout=1.0)
  201. assert chunk == b"b"
  202. await shutdown_broadcaster("p1")
  203. # ---------------------------------------------------------------------------
  204. # Audit findings: subscribe-after-grace-stops contract + unsubscribe count
  205. # ---------------------------------------------------------------------------
  206. async def test_subscribe_to_stopped_raises_so_route_can_retry():
  207. """Contract: subscribe() raises RuntimeError when called on a stopped
  208. broadcaster. The route relies on this signal to re-fetch the registry
  209. entry (which will then mint a fresh broadcaster) instead of subscribing
  210. to a corpse.
  211. """
  212. bc = MjpegBroadcaster("p1", _make_factory([b"x"], delay=0.005))
  213. await bc.force_shutdown()
  214. assert bc.stopped is True
  215. with pytest.raises(RuntimeError):
  216. await bc.subscribe()
  217. async def test_unsubscribe_returns_remaining_count_atomically():
  218. """Two subscribers leaving simultaneously must report distinct remaining
  219. counts (1 then 0), not both report 0 due to a race between unsubscribe
  220. and reading subscriber_count after the fact.
  221. """
  222. bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
  223. q1 = await bc.subscribe()
  224. q2 = await bc.subscribe()
  225. # Run both unsubscribes concurrently. Each should return its own
  226. # post-removal count.
  227. counts = await asyncio.gather(bc.unsubscribe(q1), bc.unsubscribe(q2))
  228. assert sorted(counts) == [0, 1], f"expected one unsubscribe to see 1 remaining and the other to see 0, got {counts}"
  229. await bc.force_shutdown()
  230. async def test_unsubscribe_idempotent_returns_current_count():
  231. """Double-unsubscribe (e.g. shutdown raced with iter_subscriber finally)
  232. must not corrupt state; second call returns whatever the count is now.
  233. """
  234. bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
  235. q1 = await bc.subscribe()
  236. await bc.subscribe() # q2 stays subscribed; we only care about removal of q1
  237. first = await bc.unsubscribe(q1)
  238. again = await bc.unsubscribe(q1) # already gone
  239. assert first == 1
  240. assert again == 1 # q2 is still there
  241. await bc.force_shutdown()
  242. async def test_force_shutdown_then_subscribe_via_registry_works():
  243. """Simulates the route's retry path: a viewer calls subscribe(), gets
  244. RuntimeError, calls get_or_create_broadcaster again, and successfully
  245. subscribes to the fresh broadcaster.
  246. """
  247. factory = _make_factory([b"hello"] * 1000, delay=0.02)
  248. bc1 = await get_or_create_broadcaster("p1", factory)
  249. # Mark the registered broadcaster stopped to simulate the grace teardown
  250. # winning the race against a new subscriber.
  251. await bc1.force_shutdown()
  252. # First subscribe attempt would raise on bc1; the registry replaces it.
  253. bc2 = await get_or_create_broadcaster("p1", factory)
  254. assert bc2 is not bc1
  255. queue = await bc2.subscribe()
  256. chunk = await asyncio.wait_for(queue.get(), timeout=1.0)
  257. assert chunk == b"hello"
  258. await shutdown_broadcaster("p1")