| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- """Unit tests for the MJPEG fan-out broadcaster (#1089).
- These tests do not touch ffmpeg or any printer — they drive a fake upstream
- generator and assert subscriber/pump lifecycle behaviour.
- """
- from __future__ import annotations
- import asyncio
- from collections.abc import AsyncGenerator
- import pytest
- from backend.app.services import camera_fanout
- from backend.app.services.camera_fanout import (
- MjpegBroadcaster,
- get_or_create_broadcaster,
- iter_subscriber,
- shutdown_all_broadcasters,
- shutdown_broadcaster,
- )
- pytestmark = pytest.mark.asyncio
- # Speed up grace-window tests so the suite stays fast. The default 5s grace
- # is overkill for unit tests; we patch it down to a few ms.
- @pytest.fixture(autouse=True)
- def _short_grace(monkeypatch):
- monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.05)
- @pytest.fixture(autouse=True)
- async def _clean_registry():
- """Reset the global broadcaster registry between tests."""
- await shutdown_all_broadcasters()
- yield
- await shutdown_all_broadcasters()
- def _make_factory(
- chunks: list[bytes],
- *,
- delay: float = 0.0,
- pump_started: asyncio.Event | None = None,
- pump_count: list[int] | None = None,
- ):
- """Build an upstream factory that yields a fixed list of chunks."""
- async def factory(disconnect: asyncio.Event) -> AsyncGenerator[bytes, None]:
- if pump_started is not None:
- pump_started.set()
- if pump_count is not None:
- pump_count[0] += 1
- for chunk in chunks:
- if disconnect.is_set():
- return
- if delay:
- try:
- await asyncio.wait_for(disconnect.wait(), timeout=delay)
- return
- except asyncio.TimeoutError:
- pass
- yield chunk
- return factory
- # ---------------------------------------------------------------------------
- # Single subscriber
- # ---------------------------------------------------------------------------
- async def test_single_subscriber_receives_all_frames():
- bc = MjpegBroadcaster("p1", _make_factory([b"a", b"b", b"c"], delay=0.005))
- queue = await bc.subscribe()
- received = []
- for _ in range(3):
- received.append(await asyncio.wait_for(queue.get(), timeout=1.0))
- assert received == [b"a", b"b", b"c"]
- await bc.force_shutdown()
- # ---------------------------------------------------------------------------
- # Multiple subscribers share one upstream
- # ---------------------------------------------------------------------------
- async def test_multiple_subscribers_share_single_upstream():
- pump_count = [0]
- bc = MjpegBroadcaster(
- "p1",
- _make_factory([b"f1", b"f2", b"f3"], delay=0.01, pump_count=pump_count),
- )
- q1 = await bc.subscribe()
- q2 = await bc.subscribe()
- q3 = await bc.subscribe()
- # Each subscriber must receive each frame exactly once.
- for q in (q1, q2, q3):
- received = []
- for _ in range(3):
- received.append(await asyncio.wait_for(q.get(), timeout=1.0))
- assert received == [b"f1", b"f2", b"f3"]
- # Only ONE upstream pump ever ran — that is the entire point of the bug fix.
- assert pump_count[0] == 1
- await bc.force_shutdown()
- # ---------------------------------------------------------------------------
- # Slow subscriber should not block fast subscribers
- # ---------------------------------------------------------------------------
- async def test_slow_subscriber_does_not_block_others():
- # Generate more frames than the queue depth so a non-draining queue is
- # guaranteed to fill up.
- chunks = [bytes([i % 256]) for i in range(50)]
- bc = MjpegBroadcaster("p1", _make_factory(chunks, delay=0.001))
- slow = await bc.subscribe()
- fast = await bc.subscribe()
- # Drain `fast` quickly; never read from `slow`. The fast subscriber must
- # still get every frame even though `slow` is wedged.
- received_fast = []
- for _ in range(50):
- received_fast.append(await asyncio.wait_for(fast.get(), timeout=2.0))
- assert received_fast == chunks
- # Slow subscriber's queue should be at most _SUBSCRIBER_QUEUE_SIZE — older
- # frames were dropped, not stuffed indefinitely.
- assert slow.qsize() <= camera_fanout._SUBSCRIBER_QUEUE_SIZE
- await bc.force_shutdown()
- # ---------------------------------------------------------------------------
- # Last-subscriber-leaves grace window
- # ---------------------------------------------------------------------------
- async def test_pump_torn_down_after_last_subscriber_leaves(monkeypatch):
- monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.05)
- pump_count = [0]
- # Long upstream so we know it's still running until disconnect signals it.
- bc = MjpegBroadcaster(
- "p1",
- _make_factory([b"x"] * 1000, delay=0.05, pump_count=pump_count),
- )
- queue = await bc.subscribe()
- # Read a couple of frames.
- await asyncio.wait_for(queue.get(), timeout=1.0)
- await bc.unsubscribe(queue)
- # Wait for grace window to elapse + a hair more.
- await asyncio.sleep(0.2)
- assert bc.subscriber_count == 0
- assert bc.stopped is True
- assert pump_count[0] == 1
- async def test_grace_window_cancelled_on_rejoin(monkeypatch):
- monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.1)
- pump_count = [0]
- bc = MjpegBroadcaster(
- "p1",
- _make_factory([b"x"] * 1000, delay=0.02, pump_count=pump_count),
- )
- q1 = await bc.subscribe()
- await asyncio.wait_for(q1.get(), timeout=1.0)
- await bc.unsubscribe(q1)
- # Rejoin BEFORE grace expires — pump should keep running.
- await asyncio.sleep(0.02)
- q2 = await bc.subscribe()
- # Settle past the original grace deadline.
- await asyncio.sleep(0.2)
- # Pump still alive, only one upstream connection ever opened.
- assert bc.stopped is False
- assert pump_count[0] == 1
- # And the second subscriber is still receiving frames.
- await asyncio.wait_for(q2.get(), timeout=1.0)
- await bc.force_shutdown()
- # ---------------------------------------------------------------------------
- # Force shutdown wakes subscribers
- # ---------------------------------------------------------------------------
- async def test_force_shutdown_signals_subscribers():
- bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
- queue = await bc.subscribe()
- await asyncio.wait_for(queue.get(), timeout=1.0)
- await bc.force_shutdown()
- # Subscriber's queue should contain the upstream-gone sentinel (or be
- # drained); either way a get() must complete promptly.
- sentinel = await asyncio.wait_for(queue.get(), timeout=1.0)
- assert sentinel == camera_fanout._UPSTREAM_GONE
- assert bc.stopped is True
- # ---------------------------------------------------------------------------
- # iter_subscriber helper exits cleanly on upstream-gone and disconnect
- # ---------------------------------------------------------------------------
- async def test_iter_subscriber_exits_on_upstream_gone():
- bc = MjpegBroadcaster("p1", _make_factory([b"a", b"b"], delay=0.005))
- queue = await bc.subscribe()
- received = []
- async for chunk in iter_subscriber(bc, queue):
- received.append(chunk)
- # Pump exited after yielding two chunks; iter_subscriber must return.
- assert received == [b"a", b"b"]
- # Helper unsubscribed us on the way out.
- assert bc.subscriber_count == 0
- async def test_iter_subscriber_exits_on_client_disconnect():
- bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.02))
- queue = await bc.subscribe()
- seen = 0
- async def is_disconnected() -> bool:
- return seen >= 2 # Pretend the client left after 2 frames.
- async for _chunk in iter_subscriber(bc, queue, is_disconnected=is_disconnected):
- seen += 1
- if seen >= 5: # Defensive cap so a buggy iterator can't run forever.
- break
- assert seen == 2
- assert bc.subscriber_count == 0
- await bc.force_shutdown()
- # ---------------------------------------------------------------------------
- # Registry: stopped broadcasters get replaced
- # ---------------------------------------------------------------------------
- async def test_registry_replaces_stopped_broadcaster():
- factory_a = _make_factory([b"a"] * 1000, delay=0.02)
- factory_b = _make_factory([b"b"] * 1000, delay=0.02)
- bc1 = await get_or_create_broadcaster("p1", factory_a)
- q1 = await bc1.subscribe()
- await asyncio.wait_for(q1.get(), timeout=1.0)
- await shutdown_broadcaster("p1")
- assert bc1.stopped is True
- # New subscription must get a fresh broadcaster.
- bc2 = await get_or_create_broadcaster("p1", factory_b)
- assert bc2 is not bc1
- q2 = await bc2.subscribe()
- chunk = await asyncio.wait_for(q2.get(), timeout=1.0)
- assert chunk == b"b"
- await shutdown_broadcaster("p1")
- # ---------------------------------------------------------------------------
- # Audit findings: subscribe-after-grace-stops contract + unsubscribe count
- # ---------------------------------------------------------------------------
- async def test_subscribe_to_stopped_raises_so_route_can_retry():
- """Contract: subscribe() raises RuntimeError when called on a stopped
- broadcaster. The route relies on this signal to re-fetch the registry
- entry (which will then mint a fresh broadcaster) instead of subscribing
- to a corpse.
- """
- bc = MjpegBroadcaster("p1", _make_factory([b"x"], delay=0.005))
- await bc.force_shutdown()
- assert bc.stopped is True
- with pytest.raises(RuntimeError):
- await bc.subscribe()
- async def test_unsubscribe_returns_remaining_count_atomically():
- """Two subscribers leaving simultaneously must report distinct remaining
- counts (1 then 0), not both report 0 due to a race between unsubscribe
- and reading subscriber_count after the fact.
- """
- bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
- q1 = await bc.subscribe()
- q2 = await bc.subscribe()
- # Run both unsubscribes concurrently. Each should return its own
- # post-removal count.
- counts = await asyncio.gather(bc.unsubscribe(q1), bc.unsubscribe(q2))
- assert sorted(counts) == [0, 1], f"expected one unsubscribe to see 1 remaining and the other to see 0, got {counts}"
- await bc.force_shutdown()
- async def test_unsubscribe_idempotent_returns_current_count():
- """Double-unsubscribe (e.g. shutdown raced with iter_subscriber finally)
- must not corrupt state; second call returns whatever the count is now.
- """
- bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
- q1 = await bc.subscribe()
- await bc.subscribe() # q2 stays subscribed; we only care about removal of q1
- first = await bc.unsubscribe(q1)
- again = await bc.unsubscribe(q1) # already gone
- assert first == 1
- assert again == 1 # q2 is still there
- await bc.force_shutdown()
- async def test_force_shutdown_then_subscribe_via_registry_works():
- """Simulates the route's retry path: a viewer calls subscribe(), gets
- RuntimeError, calls get_or_create_broadcaster again, and successfully
- subscribes to the fresh broadcaster.
- """
- factory = _make_factory([b"hello"] * 1000, delay=0.02)
- bc1 = await get_or_create_broadcaster("p1", factory)
- # Mark the registered broadcaster stopped to simulate the grace teardown
- # winning the race against a new subscriber.
- await bc1.force_shutdown()
- # First subscribe attempt would raise on bc1; the registry replaces it.
- bc2 = await get_or_create_broadcaster("p1", factory)
- assert bc2 is not bc1
- queue = await bc2.subscribe()
- chunk = await asyncio.wait_for(queue.get(), timeout=1.0)
- assert chunk == b"hello"
- await shutdown_broadcaster("p1")
|