Explorar o código

fix(#1089): camera stream fan-out broadcaster

  Most Bambu Lab printers only allow one concurrent camera connection, but
  GET /printers/{id}/camera/stream opened a fresh upstream per viewer.
  Two browser tabs → second viewer fails or kicks the first off.

  New MjpegBroadcaster (services/camera_fanout.py) owns one upstream per
  printer and fans MJPEG chunks out to N subscribers. 5 s grace window
  absorbs tab refreshes without reconnecting. Bounded subscriber queues
  drop frames for slow viewers rather than blocking the broadcaster.

  Audit-pass fixes:
  - _stream_start_times set with setdefault() so stream_uptime reflects
    the shared upstream's age, not the most-recent viewer's
  - subscribe() retried once on RuntimeError to close a tiny grace race
  - unsubscribe() returns post-removal count atomically so the detach log
    no longer races with concurrent leavers

  Permission gates unchanged; broadcaster has no FastAPI surface.

  Tests: 13 broadcaster unit tests + 2 integration tests on /camera/stop.
  External-camera path untouched.
maziggy hai 1 mes
pai
achega
1e3ad697f2

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 0 - 0
CHANGELOG.md


+ 79 - 75
backend/app/api/routes/camera.py

@@ -31,6 +31,12 @@ from backend.app.services.camera import (
     read_next_chamber_frame,
     test_camera_connection,
 )
+from backend.app.services.camera_fanout import (
+    MjpegBroadcaster,
+    get_or_create_broadcaster,
+    iter_subscriber,
+    shutdown_broadcaster,
+)
 
 logger = logging.getLogger(__name__)
 router = APIRouter(prefix="/printers", tags=["camera"])
@@ -552,8 +558,6 @@ async def camera_stream(
         printer_id: Printer ID
         fps: Target frames per second (default: 10, max: 30)
     """
-    import uuid
-
     printer = await get_printer_or_404(printer_id, db)
 
     # Check for external camera first
@@ -602,12 +606,6 @@ async def camera_stream(
     else:
         fps = min(max(fps, 1), 30)
 
-    # Generate unique stream ID for tracking
-    stream_id = f"{printer_id}-{uuid.uuid4().hex[:8]}"
-
-    # Create disconnect event that will be set when client disconnects
-    disconnect_event = asyncio.Event()
-
     # Choose the appropriate stream generator based on model
     if is_chamber_image_model(printer.model):
         stream_generator = generate_chamber_mjpeg_stream
@@ -616,80 +614,80 @@ async def camera_stream(
         stream_generator = generate_rtsp_mjpeg_stream
         logger.info("Using RTSP protocol for %s", printer.model)
 
-    # Track stream start time
+    # Track stream start time. Set only if absent so the value reflects when
+    # the SHARED upstream first started streaming, not when each new viewer
+    # attached — otherwise /camera/status would report stream_uptime jumping
+    # backward whenever a second viewer joins. The upstream generator's
+    # finally clears this entry when the upstream actually ends.
     import time
 
-    _stream_start_times[printer_id] = time.time()
-
-    async def _kill_stream_process(sid: str):
-        """Terminate+kill the ffmpeg process for a stream ID."""
-        proc = _active_streams.get(sid)
-        if proc and proc.returncode is None:
-            try:
-                proc.terminate()
-                try:
-                    await asyncio.wait_for(proc.wait(), timeout=2.0)
-                except TimeoutError:
-                    proc.kill()
-                    await proc.wait()
-            except (ProcessLookupError, OSError):
-                pass
-
-    async def _monitor_disconnect():
-        """Background task: poll for client disconnect independently of frame loop."""
-        try:
-            while not disconnect_event.is_set():
-                await asyncio.sleep(2)
-                if await request.is_disconnected():
-                    logger.info("Disconnect monitor: client gone (stream %s)", stream_id)
-                    disconnect_event.set()
-                    # Kill ffmpeg process (RTSP streams)
-                    await _kill_stream_process(stream_id)
-                    # Close chamber stream connection if applicable
-                    chamber = _active_chamber_streams.get(stream_id)
-                    if chamber:
-                        try:
-                            chamber[1].close()
-                        except OSError:
-                            pass
-                    break
-        except asyncio.CancelledError:
-            pass
+    _stream_start_times.setdefault(printer_id, time.time())
+
+    # Fan-out broadcaster (#1089): one upstream connection per printer, shared
+    # across all viewers. Most Bambu printers only allow a single concurrent
+    # camera connection, so opening the same printer in two tabs would
+    # otherwise kick the first viewer off. The broadcaster owns the single
+    # upstream and the per-viewer disconnect handling.
+    #
+    # Note: the upstream's fps is fixed by the first viewer who creates the
+    # broadcaster. Concurrent viewers share that rate; new viewers after
+    # teardown create a fresh broadcaster at their requested fps.
+    fanout_key = f"printer-{printer_id}"
+    upstream_stream_id = f"{printer_id}-fanout"
+
+    def _factory(disconnect_event: asyncio.Event):
+        # Re-bind locals into the closure so the async generator below sees
+        # them — disconnect_event is owned by the broadcaster and signalled
+        # when the last subscriber leaves (after the grace window).
+        return stream_generator(
+            ip_address=printer.ip_address,
+            access_code=printer.access_code,
+            model=printer.model,
+            fps=fps,
+            stream_id=upstream_stream_id,
+            disconnect_event=disconnect_event,
+            printer_id=printer_id,
+        )
 
-    monitor_task = asyncio.create_task(_monitor_disconnect())
+    # Subscribe with a one-shot retry to close a tiny race: the grace-window
+    # teardown can flip the broadcaster to `stopped=True` between the registry
+    # lookup and our subscribe call. The retry forces the registry to mint a
+    # fresh broadcaster (since the now-stopped one is replaced), and the second
+    # subscribe is guaranteed to land on it before any teardown can fire.
+    broadcaster: MjpegBroadcaster = await get_or_create_broadcaster(fanout_key, _factory)
+    try:
+        queue = await broadcaster.subscribe()
+    except RuntimeError:
+        broadcaster = await get_or_create_broadcaster(fanout_key, _factory)
+        queue = await broadcaster.subscribe()
+    logger.info(
+        "Camera viewer attached to %s (subscribers=%d)",
+        fanout_key,
+        broadcaster.subscriber_count,
+    )
 
-    async def stream_with_disconnect_check():
-        """Wrapper generator that monitors for client disconnect."""
+    async def _is_disconnected() -> bool:
         try:
-            async for chunk in stream_generator(
-                ip_address=printer.ip_address,
-                access_code=printer.access_code,
-                model=printer.model,
-                fps=fps,
-                stream_id=stream_id,
-                disconnect_event=disconnect_event,
-                printer_id=printer_id,
-            ):
-                # Check if client is still connected
-                if disconnect_event.is_set() or await request.is_disconnected():
-                    logger.info("Client disconnected detected for stream %s", stream_id)
-                    disconnect_event.set()
-                    break
-                yield chunk
-        except asyncio.CancelledError:
-            logger.info("Stream %s cancelled", stream_id)
-            disconnect_event.set()
-        except GeneratorExit:
-            logger.info("Stream %s generator closed", stream_id)
-            disconnect_event.set()
-        finally:
-            disconnect_event.set()
-            monitor_task.cancel()
-            # Give a moment for the inner generator to clean up
-            await asyncio.sleep(0.1)
+            return await request.is_disconnected()
+        except Exception:
+            # Older starlette/uvicorn can raise during teardown — treat that
+            # as "client gone" so the subscriber cleanly unsubscribes.
+            return True
+
+    def _log_detach(remaining: int) -> None:
+        logger.info("Camera viewer detached from %s (subscribers=%d)", fanout_key, remaining)
+
+    async def _generate():
+        async for chunk in iter_subscriber(
+            broadcaster,
+            queue,
+            is_disconnected=_is_disconnected,
+            on_unsubscribe=_log_detach,
+        ):
+            yield chunk
 
     return StreamingResponse(
-        stream_with_disconnect_check(),
+        _generate(),
         media_type="multipart/x-mixed-replace; boundary=frame",
         headers={
             "Cache-Control": "no-cache, no-store, must-revalidate",
@@ -711,6 +709,12 @@ async def stop_camera_stream(
     """
     stopped = 0
 
+    # Tear down the fan-out broadcaster first (#1089). This cleanly notifies
+    # all subscribed viewers and asks the upstream generator to stop
+    # reconnecting before we fall back to forcefully killing the process below.
+    if await shutdown_broadcaster(f"printer-{printer_id}"):
+        logger.info("Shut down camera fan-out broadcaster for printer %s", printer_id)
+
     # Stop ffmpeg/RTSP streams
     to_remove = []
     for stream_id, process in list(_active_streams.items()):

+ 8 - 0
backend/app/main.py

@@ -4279,6 +4279,14 @@ async def lifespan(app: FastAPI):
     stop_runtime_tracking()
     stop_spoolbuddy_watchdog()
     stop_camera_cleanup()
+    # Tear down all camera fan-out broadcasters (#1089) so subscribers exit
+    # cleanly rather than waiting on a queue that nothing will ever fill.
+    try:
+        from backend.app.services.camera_fanout import shutdown_all_broadcasters
+
+        await shutdown_all_broadcasters()
+    except Exception as e:
+        logging.warning("Failed to shut down camera broadcasters: %s", e)
     stop_expected_prints_cleanup()
     stop_auth_cleanup()
     printer_manager.disconnect_all()

+ 280 - 0
backend/app/services/camera_fanout.py

@@ -0,0 +1,280 @@
+"""MJPEG fan-out broadcaster for camera streams.
+
+Most Bambu Lab printers only allow one concurrent camera connection: the
+RTSP socket on X1/H2/P2 models, the chamber-image socket on port 6000 on
+A1/P1 models. Without fan-out, opening a second viewer either fails or
+kicks the first viewer off — see issue #1089.
+
+This module owns a single upstream connection per printer and pushes each
+frame to N independent subscriber queues. New viewers tap the existing
+upstream; no new printer connection is opened. When the last subscriber
+leaves, the upstream is torn down after a short grace window so that a
+quick page refresh or second-tab open does not pay a reconnect.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from collections.abc import AsyncGenerator, Awaitable, Callable
+
+logger = logging.getLogger(__name__)
+
+# How long to keep the upstream pump alive after the last subscriber leaves.
+# A short grace window absorbs page refreshes and "open camera in new tab"
+# without paying a fresh ffmpeg/RTSP handshake (which can take several seconds
+# on some firmwares and is the very reconnect cost we are trying to avoid).
+_GRACE_SECONDS = 5.0
+
+# Per-subscriber queue depth. Small on purpose: if a viewer can't keep up
+# with the printer's frame rate we drop frames for that viewer rather than
+# blocking the broadcaster. Live video — old frames have no value.
+_SUBSCRIBER_QUEUE_SIZE = 4
+
+# Sentinel pushed to subscriber queues when the upstream pump exits, so each
+# subscriber's read loop can break out cleanly instead of hanging on get().
+_UPSTREAM_GONE = b""
+
+UpstreamFactory = Callable[[asyncio.Event], AsyncGenerator[bytes, None]]
+
+
+class MjpegBroadcaster:
+    """Single upstream MJPEG stream, fanned out to N subscribers."""
+
+    def __init__(self, key: str, factory: UpstreamFactory) -> None:
+        self._key = key
+        self._factory = factory
+        self._subscribers: list[asyncio.Queue[bytes]] = []
+        self._lock = asyncio.Lock()
+        self._pump_task: asyncio.Task | None = None
+        self._grace_task: asyncio.Task | None = None
+        # Disconnect event passed to the upstream generator so we can ask it to
+        # stop reconnecting when the last subscriber leaves.
+        self._upstream_disconnect = asyncio.Event()
+        self._stopped = False
+
+    @property
+    def key(self) -> str:
+        return self._key
+
+    @property
+    def subscriber_count(self) -> int:
+        return len(self._subscribers)
+
+    @property
+    def stopped(self) -> bool:
+        return self._stopped
+
+    async def subscribe(self) -> asyncio.Queue[bytes]:
+        """Add a subscriber and ensure the upstream pump is running."""
+        async with self._lock:
+            if self._stopped:
+                raise RuntimeError(f"broadcaster {self._key!r} is stopped")
+
+            # Cancel any pending grace-window shutdown — a viewer just rejoined.
+            if self._grace_task is not None and not self._grace_task.done():
+                self._grace_task.cancel()
+                self._grace_task = None
+
+            queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=_SUBSCRIBER_QUEUE_SIZE)
+            self._subscribers.append(queue)
+
+            if self._pump_task is None or self._pump_task.done():
+                # Reset the disconnect signal in case a previous pump set it.
+                self._upstream_disconnect = asyncio.Event()
+                self._pump_task = asyncio.create_task(self._pump(), name=f"camera-fanout-pump-{self._key}")
+            return queue
+
+    async def unsubscribe(self, queue: asyncio.Queue[bytes]) -> int:
+        """Remove a subscriber and return the remaining count (atomic).
+
+        If this was the last subscriber, schedule grace shutdown.
+        """
+        async with self._lock:
+            try:
+                self._subscribers.remove(queue)
+            except ValueError:
+                return len(self._subscribers)  # Already removed (e.g. force_shutdown)
+            remaining = len(self._subscribers)
+            if remaining == 0 and not self._stopped:
+                # Last subscriber left. Schedule grace-window teardown.
+                self._grace_task = asyncio.create_task(self._grace_then_stop(), name=f"camera-fanout-grace-{self._key}")
+            return remaining
+
+    async def force_shutdown(self) -> None:
+        """Tear down immediately, kick all subscribers. Idempotent."""
+        pump_task = await self._mark_stopped_locked(notify_subscribers=True)
+        await self._await_pump_cancellation(pump_task)
+
+    async def _grace_then_stop(self) -> None:
+        try:
+            await asyncio.sleep(_GRACE_SECONDS)
+        except asyncio.CancelledError:
+            return  # New subscriber arrived during grace
+        # Re-check under the lock — a subscriber may have rejoined between
+        # the sleep finishing and us acquiring the lock.
+        pump_task: asyncio.Task | None = None
+        async with self._lock:
+            if self._subscribers or self._stopped:
+                return
+            self._upstream_disconnect.set()
+            pump_task = self._pump_task
+            self._pump_task = None
+            self._grace_task = None
+            self._stopped = True
+        await self._await_pump_cancellation(pump_task)
+
+    async def _mark_stopped_locked(self, *, notify_subscribers: bool) -> asyncio.Task | None:
+        """Mark the broadcaster stopped and detach the pump task.
+
+        Caller MUST NOT hold ``self._lock`` (we acquire it here). Returns the
+        pump task (if any) so the caller can await its cancellation OUTSIDE
+        the lock — the pump's ``finally`` block needs the lock to wake up
+        subscribers, so we'd deadlock if we awaited it under the lock.
+        """
+        async with self._lock:
+            if self._stopped and self._pump_task is None:
+                return None
+            self._upstream_disconnect.set()
+            if notify_subscribers:
+                for queue in self._subscribers:
+                    try:
+                        queue.put_nowait(_UPSTREAM_GONE)
+                    except asyncio.QueueFull:
+                        pass
+                self._subscribers.clear()
+            pump_task = self._pump_task
+            self._pump_task = None
+            self._stopped = True
+            if self._grace_task is not None and not self._grace_task.done():
+                self._grace_task.cancel()
+                self._grace_task = None
+            return pump_task
+
+    async def _await_pump_cancellation(self, pump_task: asyncio.Task | None) -> None:
+        if pump_task is None or pump_task.done():
+            return
+        pump_task.cancel()
+        try:
+            await pump_task
+        except (asyncio.CancelledError, Exception):
+            # Pump exceptions are already logged inside _pump; swallow here so
+            # teardown can never propagate a stray crash.
+            pass
+
+    async def _pump(self) -> None:
+        """Drive the upstream generator and broadcast each chunk."""
+        try:
+            async for chunk in self._factory(self._upstream_disconnect):
+                # Snapshot subscribers under lock so we don't iterate a list
+                # mutated by subscribe()/unsubscribe() while we are putting.
+                async with self._lock:
+                    targets = list(self._subscribers)
+                for queue in targets:
+                    try:
+                        queue.put_nowait(chunk)
+                    except asyncio.QueueFull:
+                        # Slow viewer — drop this frame for them. They'll catch
+                        # up on the next frame. Don't unsubscribe: a brief
+                        # browser stall shouldn't end the stream.
+                        pass
+        except asyncio.CancelledError:
+            raise
+        except Exception:
+            logger.exception("Camera fan-out pump crashed for %s", self._key)
+        finally:
+            # Pump is exiting — wake up any subscribers still hanging on get().
+            async with self._lock:
+                for queue in self._subscribers:
+                    try:
+                        queue.put_nowait(_UPSTREAM_GONE)
+                    except asyncio.QueueFull:
+                        pass
+
+
+# Global registry. Keyed by printer_id (as str) so a chamber-mode printer
+# and an RTSP-mode printer can never collide on the same key.
+_broadcasters: dict[str, MjpegBroadcaster] = {}
+_registry_lock = asyncio.Lock()
+
+
+async def get_or_create_broadcaster(key: str, factory: UpstreamFactory) -> MjpegBroadcaster:
+    """Return the live broadcaster for `key`, creating one if needed.
+
+    A broadcaster that has been stopped (force shutdown or grace timeout) is
+    replaced with a fresh instance — the caller will subscribe to the new one.
+    """
+    async with _registry_lock:
+        existing = _broadcasters.get(key)
+        if existing is not None and not existing.stopped:
+            return existing
+        new_bc = MjpegBroadcaster(key, factory)
+        _broadcasters[key] = new_bc
+        return new_bc
+
+
+async def shutdown_broadcaster(key: str) -> bool:
+    """Force-shutdown the broadcaster for `key`. Returns True if one was running."""
+    async with _registry_lock:
+        bc = _broadcasters.pop(key, None)
+    if bc is None:
+        return False
+    await bc.force_shutdown()
+    return True
+
+
+async def shutdown_all_broadcasters() -> None:
+    """Tear down every broadcaster (for app shutdown)."""
+    async with _registry_lock:
+        bcs = list(_broadcasters.values())
+        _broadcasters.clear()
+    await asyncio.gather(*(bc.force_shutdown() for bc in bcs), return_exceptions=True)
+
+
+def active_broadcaster_keys() -> list[str]:
+    """Snapshot of keys with a live (non-stopped) broadcaster. For diagnostics."""
+    return [k for k, bc in _broadcasters.items() if not bc.stopped]
+
+
+# ---------------------------------------------------------------------------
+# AsyncGenerator helper — turns a subscriber queue into an async generator
+# that yields MJPEG chunks until the upstream signals it's gone.
+# ---------------------------------------------------------------------------
+
+
+async def iter_subscriber(
+    broadcaster: MjpegBroadcaster,
+    queue: asyncio.Queue[bytes],
+    *,
+    is_disconnected: Callable[[], Awaitable[bool]] | None = None,
+    on_unsubscribe: Callable[[int], None] | None = None,
+) -> AsyncGenerator[bytes, None]:
+    """Yield chunks from a subscriber queue until upstream ends or client leaves.
+
+    Always unsubscribes from the broadcaster on exit, even on cancellation.
+    The optional ``on_unsubscribe`` callback receives the post-unsubscribe
+    subscriber count — useful for accurate detach-log lines that don't race
+    with concurrent unsubscribes.
+    """
+    try:
+        while True:
+            try:
+                chunk = await asyncio.wait_for(queue.get(), timeout=30.0)
+            except asyncio.TimeoutError:
+                # No frame in 30s — check whether the client is still there.
+                # If yes, keep waiting; if no, bail out.
+                if is_disconnected is not None and await is_disconnected():
+                    break
+                continue
+            if chunk == _UPSTREAM_GONE:
+                break
+            yield chunk
+            if is_disconnected is not None and await is_disconnected():
+                break
+    finally:
+        remaining = await broadcaster.unsubscribe(queue)
+        if on_unsubscribe is not None:
+            try:
+                on_unsubscribe(remaining)
+            except Exception:
+                logger.exception("on_unsubscribe callback raised")

+ 41 - 0
backend/tests/integration/test_camera_api.py

@@ -106,6 +106,47 @@ class TestCameraAPI:
         mock_process1.terminate.assert_called_once()
         mock_process2.terminate.assert_not_called()
 
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_stop_camera_stream_handles_fanout_stream_id(self, async_client: AsyncClient, printer_factory):
+        """Stop must terminate streams keyed with the deterministic
+        ``{printer_id}-fanout`` id used by the fan-out broadcaster (#1089).
+        Regression guard against the prefix-match drifting away from the
+        broadcaster's stream-id convention.
+        """
+        printer = await printer_factory()
+        mock_process = MagicMock()
+        mock_process.returncode = None
+        mock_process.pid = 99996
+        mock_process.terminate = MagicMock()
+        mock_process.wait = AsyncMock()
+
+        with patch(
+            "backend.app.api.routes.camera._active_streams",
+            {f"{printer.id}-fanout": mock_process},
+        ):
+            response = await async_client.post(f"/api/v1/printers/{printer.id}/camera/stop")
+
+        assert response.status_code == 200
+        assert response.json()["stopped"] == 1
+        mock_process.terminate.assert_called_once()
+
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_stop_camera_stream_invokes_broadcaster_shutdown(self, async_client: AsyncClient, printer_factory):
+        """Stop must call ``shutdown_broadcaster`` so subscribers wake up via
+        the upstream-gone sentinel rather than stalling on the queue (#1089)."""
+        printer = await printer_factory()
+
+        with patch(
+            "backend.app.api.routes.camera.shutdown_broadcaster",
+            AsyncMock(return_value=False),
+        ) as mock_shutdown:
+            response = await async_client.post(f"/api/v1/printers/{printer.id}/camera/stop")
+
+        assert response.status_code == 200
+        mock_shutdown.assert_awaited_once_with(f"printer-{printer.id}")
+
     # ========================================================================
     # Camera Test Endpoint
     # ========================================================================

+ 341 - 0
backend/tests/unit/services/test_camera_fanout.py

@@ -0,0 +1,341 @@
+"""Unit tests for the MJPEG fan-out broadcaster (#1089).
+
+These tests do not touch ffmpeg or any printer — they drive a fake upstream
+generator and assert subscriber/pump lifecycle behaviour.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncGenerator
+
+import pytest
+
+from backend.app.services import camera_fanout
+from backend.app.services.camera_fanout import (
+    MjpegBroadcaster,
+    get_or_create_broadcaster,
+    iter_subscriber,
+    shutdown_all_broadcasters,
+    shutdown_broadcaster,
+)
+
+pytestmark = pytest.mark.asyncio
+
+
+# Speed up grace-window tests so the suite stays fast. The default 5s grace
+# is overkill for unit tests; we patch it down to a few ms.
+@pytest.fixture(autouse=True)
+def _short_grace(monkeypatch):
+    monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.05)
+
+
+@pytest.fixture(autouse=True)
+async def _clean_registry():
+    """Reset the global broadcaster registry between tests."""
+    await shutdown_all_broadcasters()
+    yield
+    await shutdown_all_broadcasters()
+
+
+def _make_factory(
+    chunks: list[bytes],
+    *,
+    delay: float = 0.0,
+    pump_started: asyncio.Event | None = None,
+    pump_count: list[int] | None = None,
+):
+    """Build an upstream factory that yields a fixed list of chunks."""
+
+    async def factory(disconnect: asyncio.Event) -> AsyncGenerator[bytes, None]:
+        if pump_started is not None:
+            pump_started.set()
+        if pump_count is not None:
+            pump_count[0] += 1
+        for chunk in chunks:
+            if disconnect.is_set():
+                return
+            if delay:
+                try:
+                    await asyncio.wait_for(disconnect.wait(), timeout=delay)
+                    return
+                except asyncio.TimeoutError:
+                    pass
+            yield chunk
+
+    return factory
+
+
+# ---------------------------------------------------------------------------
+# Single subscriber
+# ---------------------------------------------------------------------------
+
+
+async def test_single_subscriber_receives_all_frames():
+    bc = MjpegBroadcaster("p1", _make_factory([b"a", b"b", b"c"], delay=0.005))
+    queue = await bc.subscribe()
+
+    received = []
+    for _ in range(3):
+        received.append(await asyncio.wait_for(queue.get(), timeout=1.0))
+
+    assert received == [b"a", b"b", b"c"]
+    await bc.force_shutdown()
+
+
+# ---------------------------------------------------------------------------
+# Multiple subscribers share one upstream
+# ---------------------------------------------------------------------------
+
+
+async def test_multiple_subscribers_share_single_upstream():
+    pump_count = [0]
+    bc = MjpegBroadcaster(
+        "p1",
+        _make_factory([b"f1", b"f2", b"f3"], delay=0.01, pump_count=pump_count),
+    )
+
+    q1 = await bc.subscribe()
+    q2 = await bc.subscribe()
+    q3 = await bc.subscribe()
+
+    # Each subscriber must receive each frame exactly once.
+    for q in (q1, q2, q3):
+        received = []
+        for _ in range(3):
+            received.append(await asyncio.wait_for(q.get(), timeout=1.0))
+        assert received == [b"f1", b"f2", b"f3"]
+
+    # Only ONE upstream pump ever ran — that is the entire point of the bug fix.
+    assert pump_count[0] == 1
+    await bc.force_shutdown()
+
+
+# ---------------------------------------------------------------------------
+# Slow subscriber should not block fast subscribers
+# ---------------------------------------------------------------------------
+
+
+async def test_slow_subscriber_does_not_block_others():
+    # Generate more frames than the queue depth so a non-draining queue is
+    # guaranteed to fill up.
+    chunks = [bytes([i % 256]) for i in range(50)]
+    bc = MjpegBroadcaster("p1", _make_factory(chunks, delay=0.001))
+
+    slow = await bc.subscribe()
+    fast = await bc.subscribe()
+
+    # Drain `fast` quickly; never read from `slow`. The fast subscriber must
+    # still get every frame even though `slow` is wedged.
+    received_fast = []
+    for _ in range(50):
+        received_fast.append(await asyncio.wait_for(fast.get(), timeout=2.0))
+
+    assert received_fast == chunks
+    # Slow subscriber's queue should be at most _SUBSCRIBER_QUEUE_SIZE — older
+    # frames were dropped, not stuffed indefinitely.
+    assert slow.qsize() <= camera_fanout._SUBSCRIBER_QUEUE_SIZE
+    await bc.force_shutdown()
+
+
+# ---------------------------------------------------------------------------
+# Last-subscriber-leaves grace window
+# ---------------------------------------------------------------------------
+
+
+async def test_pump_torn_down_after_last_subscriber_leaves(monkeypatch):
+    monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.05)
+    pump_count = [0]
+    # Long upstream so we know it's still running until disconnect signals it.
+    bc = MjpegBroadcaster(
+        "p1",
+        _make_factory([b"x"] * 1000, delay=0.05, pump_count=pump_count),
+    )
+
+    queue = await bc.subscribe()
+    # Read a couple of frames.
+    await asyncio.wait_for(queue.get(), timeout=1.0)
+    await bc.unsubscribe(queue)
+
+    # Wait for grace window to elapse + a hair more.
+    await asyncio.sleep(0.2)
+
+    assert bc.subscriber_count == 0
+    assert bc.stopped is True
+    assert pump_count[0] == 1
+
+
+async def test_grace_window_cancelled_on_rejoin(monkeypatch):
+    monkeypatch.setattr(camera_fanout, "_GRACE_SECONDS", 0.1)
+    pump_count = [0]
+    bc = MjpegBroadcaster(
+        "p1",
+        _make_factory([b"x"] * 1000, delay=0.02, pump_count=pump_count),
+    )
+
+    q1 = await bc.subscribe()
+    await asyncio.wait_for(q1.get(), timeout=1.0)
+    await bc.unsubscribe(q1)
+
+    # Rejoin BEFORE grace expires — pump should keep running.
+    await asyncio.sleep(0.02)
+    q2 = await bc.subscribe()
+    # Settle past the original grace deadline.
+    await asyncio.sleep(0.2)
+
+    # Pump still alive, only one upstream connection ever opened.
+    assert bc.stopped is False
+    assert pump_count[0] == 1
+    # And the second subscriber is still receiving frames.
+    await asyncio.wait_for(q2.get(), timeout=1.0)
+    await bc.force_shutdown()
+
+
+# ---------------------------------------------------------------------------
+# Force shutdown wakes subscribers
+# ---------------------------------------------------------------------------
+
+
+async def test_force_shutdown_signals_subscribers():
+    bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
+    queue = await bc.subscribe()
+    await asyncio.wait_for(queue.get(), timeout=1.0)
+
+    await bc.force_shutdown()
+
+    # Subscriber's queue should contain the upstream-gone sentinel (or be
+    # drained); either way a get() must complete promptly.
+    sentinel = await asyncio.wait_for(queue.get(), timeout=1.0)
+    assert sentinel == camera_fanout._UPSTREAM_GONE
+    assert bc.stopped is True
+
+
+# ---------------------------------------------------------------------------
+# iter_subscriber helper exits cleanly on upstream-gone and disconnect
+# ---------------------------------------------------------------------------
+
+
+async def test_iter_subscriber_exits_on_upstream_gone():
+    bc = MjpegBroadcaster("p1", _make_factory([b"a", b"b"], delay=0.005))
+    queue = await bc.subscribe()
+
+    received = []
+    async for chunk in iter_subscriber(bc, queue):
+        received.append(chunk)
+    # Pump exited after yielding two chunks; iter_subscriber must return.
+    assert received == [b"a", b"b"]
+    # Helper unsubscribed us on the way out.
+    assert bc.subscriber_count == 0
+
+
+async def test_iter_subscriber_exits_on_client_disconnect():
+    bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.02))
+    queue = await bc.subscribe()
+
+    seen = 0
+
+    async def is_disconnected() -> bool:
+        return seen >= 2  # Pretend the client left after 2 frames.
+
+    async for _chunk in iter_subscriber(bc, queue, is_disconnected=is_disconnected):
+        seen += 1
+        if seen >= 5:  # Defensive cap so a buggy iterator can't run forever.
+            break
+
+    assert seen == 2
+    assert bc.subscriber_count == 0
+    await bc.force_shutdown()
+
+
+# ---------------------------------------------------------------------------
+# Registry: stopped broadcasters get replaced
+# ---------------------------------------------------------------------------
+
+
+async def test_registry_replaces_stopped_broadcaster():
+    factory_a = _make_factory([b"a"] * 1000, delay=0.02)
+    factory_b = _make_factory([b"b"] * 1000, delay=0.02)
+
+    bc1 = await get_or_create_broadcaster("p1", factory_a)
+    q1 = await bc1.subscribe()
+    await asyncio.wait_for(q1.get(), timeout=1.0)
+    await shutdown_broadcaster("p1")
+    assert bc1.stopped is True
+
+    # New subscription must get a fresh broadcaster.
+    bc2 = await get_or_create_broadcaster("p1", factory_b)
+    assert bc2 is not bc1
+    q2 = await bc2.subscribe()
+    chunk = await asyncio.wait_for(q2.get(), timeout=1.0)
+    assert chunk == b"b"
+    await shutdown_broadcaster("p1")
+
+
+# ---------------------------------------------------------------------------
+# Audit findings: subscribe-after-grace-stops contract + unsubscribe count
+# ---------------------------------------------------------------------------
+
+
+async def test_subscribe_to_stopped_raises_so_route_can_retry():
+    """Contract: subscribe() raises RuntimeError when called on a stopped
+    broadcaster. The route relies on this signal to re-fetch the registry
+    entry (which will then mint a fresh broadcaster) instead of subscribing
+    to a corpse.
+    """
+    bc = MjpegBroadcaster("p1", _make_factory([b"x"], delay=0.005))
+    await bc.force_shutdown()
+    assert bc.stopped is True
+
+    with pytest.raises(RuntimeError):
+        await bc.subscribe()
+
+
+async def test_unsubscribe_returns_remaining_count_atomically():
+    """Two subscribers leaving simultaneously must report distinct remaining
+    counts (1 then 0), not both report 0 due to a race between unsubscribe
+    and reading subscriber_count after the fact.
+    """
+    bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
+    q1 = await bc.subscribe()
+    q2 = await bc.subscribe()
+
+    # Run both unsubscribes concurrently. Each should return its own
+    # post-removal count.
+    counts = await asyncio.gather(bc.unsubscribe(q1), bc.unsubscribe(q2))
+    assert sorted(counts) == [0, 1], f"expected one unsubscribe to see 1 remaining and the other to see 0, got {counts}"
+    await bc.force_shutdown()
+
+
+async def test_unsubscribe_idempotent_returns_current_count():
+    """Double-unsubscribe (e.g. shutdown raced with iter_subscriber finally)
+    must not corrupt state; second call returns whatever the count is now.
+    """
+    bc = MjpegBroadcaster("p1", _make_factory([b"x"] * 1000, delay=0.05))
+    q1 = await bc.subscribe()
+    await bc.subscribe()  # q2 stays subscribed; we only care about removal of q1
+
+    first = await bc.unsubscribe(q1)
+    again = await bc.unsubscribe(q1)  # already gone
+    assert first == 1
+    assert again == 1  # q2 is still there
+    await bc.force_shutdown()
+
+
+async def test_force_shutdown_then_subscribe_via_registry_works():
+    """Simulates the route's retry path: a viewer calls subscribe(), gets
+    RuntimeError, calls get_or_create_broadcaster again, and successfully
+    subscribes to the fresh broadcaster.
+    """
+    factory = _make_factory([b"hello"] * 1000, delay=0.02)
+    bc1 = await get_or_create_broadcaster("p1", factory)
+    # Mark the registered broadcaster stopped to simulate the grace teardown
+    # winning the race against a new subscriber.
+    await bc1.force_shutdown()
+
+    # First subscribe attempt would raise on bc1; the registry replaces it.
+    bc2 = await get_or_create_broadcaster("p1", factory)
+    assert bc2 is not bc1
+    queue = await bc2.subscribe()
+    chunk = await asyncio.wait_for(queue.get(), timeout=1.0)
+    assert chunk == b"hello"
+    await shutdown_broadcaster("p1")

Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio