camera_fanout.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. """MJPEG fan-out broadcaster for camera streams.
  2. Most Bambu Lab printers only allow one concurrent camera connection: the
  3. RTSP socket on X1/H2/P2 models, the chamber-image socket on port 6000 on
  4. A1/P1 models. Without fan-out, opening a second viewer either fails or
  5. kicks the first viewer off — see issue #1089.
  6. This module owns a single upstream connection per printer and pushes each
  7. frame to N independent subscriber queues. New viewers tap the existing
  8. upstream; no new printer connection is opened. When the last subscriber
  9. leaves, the upstream is torn down after a short grace window so that a
  10. quick page refresh or second-tab open does not pay a reconnect.
  11. """
  12. from __future__ import annotations
  13. import asyncio
  14. import logging
  15. from collections.abc import AsyncGenerator, Awaitable, Callable
  16. logger = logging.getLogger(__name__)
  17. # How long to keep the upstream pump alive after the last subscriber leaves.
  18. # A short grace window absorbs page refreshes and "open camera in new tab"
  19. # without paying a fresh ffmpeg/RTSP handshake (which can take several seconds
  20. # on some firmwares and is the very reconnect cost we are trying to avoid).
  21. _GRACE_SECONDS = 5.0
  22. # Per-subscriber queue depth. Small on purpose: if a viewer can't keep up
  23. # with the printer's frame rate we drop frames for that viewer rather than
  24. # blocking the broadcaster. Live video — old frames have no value.
  25. _SUBSCRIBER_QUEUE_SIZE = 4
  26. # Sentinel pushed to subscriber queues when the upstream pump exits, so each
  27. # subscriber's read loop can break out cleanly instead of hanging on get().
  28. _UPSTREAM_GONE = b""
  29. UpstreamFactory = Callable[[asyncio.Event], AsyncGenerator[bytes, None]]
  30. class MjpegBroadcaster:
  31. """Single upstream MJPEG stream, fanned out to N subscribers."""
  32. def __init__(self, key: str, factory: UpstreamFactory) -> None:
  33. self._key = key
  34. self._factory = factory
  35. self._subscribers: list[asyncio.Queue[bytes]] = []
  36. self._lock = asyncio.Lock()
  37. self._pump_task: asyncio.Task | None = None
  38. self._grace_task: asyncio.Task | None = None
  39. # Disconnect event passed to the upstream generator so we can ask it to
  40. # stop reconnecting when the last subscriber leaves.
  41. self._upstream_disconnect = asyncio.Event()
  42. self._stopped = False
  43. @property
  44. def key(self) -> str:
  45. return self._key
  46. @property
  47. def subscriber_count(self) -> int:
  48. return len(self._subscribers)
  49. @property
  50. def stopped(self) -> bool:
  51. return self._stopped
  52. async def subscribe(self) -> asyncio.Queue[bytes]:
  53. """Add a subscriber and ensure the upstream pump is running."""
  54. async with self._lock:
  55. if self._stopped:
  56. raise RuntimeError(f"broadcaster {self._key!r} is stopped")
  57. # Cancel any pending grace-window shutdown — a viewer just rejoined.
  58. if self._grace_task is not None and not self._grace_task.done():
  59. self._grace_task.cancel()
  60. self._grace_task = None
  61. queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=_SUBSCRIBER_QUEUE_SIZE)
  62. self._subscribers.append(queue)
  63. if self._pump_task is None or self._pump_task.done():
  64. # Reset the disconnect signal in case a previous pump set it.
  65. self._upstream_disconnect = asyncio.Event()
  66. self._pump_task = asyncio.create_task(self._pump(), name=f"camera-fanout-pump-{self._key}")
  67. return queue
  68. async def unsubscribe(self, queue: asyncio.Queue[bytes]) -> int:
  69. """Remove a subscriber and return the remaining count (atomic).
  70. If this was the last subscriber, schedule grace shutdown.
  71. """
  72. async with self._lock:
  73. try:
  74. self._subscribers.remove(queue)
  75. except ValueError:
  76. return len(self._subscribers) # Already removed (e.g. force_shutdown)
  77. remaining = len(self._subscribers)
  78. if remaining == 0 and not self._stopped:
  79. # Last subscriber left. Schedule grace-window teardown.
  80. self._grace_task = asyncio.create_task(self._grace_then_stop(), name=f"camera-fanout-grace-{self._key}")
  81. return remaining
  82. async def force_shutdown(self) -> None:
  83. """Tear down immediately, kick all subscribers. Idempotent."""
  84. pump_task = await self._mark_stopped_locked(notify_subscribers=True)
  85. await self._await_pump_cancellation(pump_task)
  86. async def _grace_then_stop(self) -> None:
  87. try:
  88. await asyncio.sleep(_GRACE_SECONDS)
  89. except asyncio.CancelledError:
  90. return # New subscriber arrived during grace
  91. # Re-check under the lock — a subscriber may have rejoined between
  92. # the sleep finishing and us acquiring the lock.
  93. pump_task: asyncio.Task | None = None
  94. async with self._lock:
  95. if self._subscribers or self._stopped:
  96. return
  97. self._upstream_disconnect.set()
  98. pump_task = self._pump_task
  99. self._pump_task = None
  100. self._grace_task = None
  101. self._stopped = True
  102. await self._await_pump_cancellation(pump_task)
  103. async def _mark_stopped_locked(self, *, notify_subscribers: bool) -> asyncio.Task | None:
  104. """Mark the broadcaster stopped and detach the pump task.
  105. Caller MUST NOT hold ``self._lock`` (we acquire it here). Returns the
  106. pump task (if any) so the caller can await its cancellation OUTSIDE
  107. the lock — the pump's ``finally`` block needs the lock to wake up
  108. subscribers, so we'd deadlock if we awaited it under the lock.
  109. """
  110. async with self._lock:
  111. if self._stopped and self._pump_task is None:
  112. return None
  113. self._upstream_disconnect.set()
  114. if notify_subscribers:
  115. for queue in self._subscribers:
  116. try:
  117. queue.put_nowait(_UPSTREAM_GONE)
  118. except asyncio.QueueFull:
  119. pass
  120. self._subscribers.clear()
  121. pump_task = self._pump_task
  122. self._pump_task = None
  123. self._stopped = True
  124. if self._grace_task is not None and not self._grace_task.done():
  125. self._grace_task.cancel()
  126. self._grace_task = None
  127. return pump_task
  128. async def _await_pump_cancellation(self, pump_task: asyncio.Task | None) -> None:
  129. if pump_task is None or pump_task.done():
  130. return
  131. pump_task.cancel()
  132. try:
  133. await pump_task
  134. except (asyncio.CancelledError, Exception):
  135. # Pump exceptions are already logged inside _pump; swallow here so
  136. # teardown can never propagate a stray crash.
  137. pass
  138. async def _pump(self) -> None:
  139. """Drive the upstream generator and broadcast each chunk."""
  140. try:
  141. async for chunk in self._factory(self._upstream_disconnect):
  142. # Snapshot subscribers under lock so we don't iterate a list
  143. # mutated by subscribe()/unsubscribe() while we are putting.
  144. async with self._lock:
  145. targets = list(self._subscribers)
  146. for queue in targets:
  147. try:
  148. queue.put_nowait(chunk)
  149. except asyncio.QueueFull:
  150. # Slow viewer — drop this frame for them. They'll catch
  151. # up on the next frame. Don't unsubscribe: a brief
  152. # browser stall shouldn't end the stream.
  153. pass
  154. except asyncio.CancelledError:
  155. raise
  156. except Exception:
  157. logger.exception("Camera fan-out pump crashed for %s", self._key)
  158. finally:
  159. # Pump is exiting — wake up any subscribers still hanging on get().
  160. async with self._lock:
  161. for queue in self._subscribers:
  162. try:
  163. queue.put_nowait(_UPSTREAM_GONE)
  164. except asyncio.QueueFull:
  165. pass
  166. # Global registry. Keyed by printer_id (as str) so a chamber-mode printer
  167. # and an RTSP-mode printer can never collide on the same key.
  168. _broadcasters: dict[str, MjpegBroadcaster] = {}
  169. _registry_lock = asyncio.Lock()
  170. async def get_or_create_broadcaster(key: str, factory: UpstreamFactory) -> MjpegBroadcaster:
  171. """Return the live broadcaster for `key`, creating one if needed.
  172. A broadcaster that has been stopped (force shutdown or grace timeout) is
  173. replaced with a fresh instance — the caller will subscribe to the new one.
  174. """
  175. async with _registry_lock:
  176. existing = _broadcasters.get(key)
  177. if existing is not None and not existing.stopped:
  178. return existing
  179. new_bc = MjpegBroadcaster(key, factory)
  180. _broadcasters[key] = new_bc
  181. return new_bc
  182. async def shutdown_broadcaster(key: str) -> bool:
  183. """Force-shutdown the broadcaster for `key`. Returns True if one was running."""
  184. async with _registry_lock:
  185. bc = _broadcasters.pop(key, None)
  186. if bc is None:
  187. return False
  188. await bc.force_shutdown()
  189. return True
  190. async def shutdown_all_broadcasters() -> None:
  191. """Tear down every broadcaster (for app shutdown)."""
  192. async with _registry_lock:
  193. bcs = list(_broadcasters.values())
  194. _broadcasters.clear()
  195. await asyncio.gather(*(bc.force_shutdown() for bc in bcs), return_exceptions=True)
  196. def active_broadcaster_keys() -> list[str]:
  197. """Snapshot of keys with a live (non-stopped) broadcaster. For diagnostics."""
  198. return [k for k, bc in _broadcasters.items() if not bc.stopped]
  199. # ---------------------------------------------------------------------------
  200. # AsyncGenerator helper — turns a subscriber queue into an async generator
  201. # that yields MJPEG chunks until the upstream signals it's gone.
  202. # ---------------------------------------------------------------------------
  203. async def iter_subscriber(
  204. broadcaster: MjpegBroadcaster,
  205. queue: asyncio.Queue[bytes],
  206. *,
  207. is_disconnected: Callable[[], Awaitable[bool]] | None = None,
  208. on_unsubscribe: Callable[[int], None] | None = None,
  209. ) -> AsyncGenerator[bytes, None]:
  210. """Yield chunks from a subscriber queue until upstream ends or client leaves.
  211. Always unsubscribes from the broadcaster on exit, even on cancellation.
  212. The optional ``on_unsubscribe`` callback receives the post-unsubscribe
  213. subscriber count — useful for accurate detach-log lines that don't race
  214. with concurrent unsubscribes.
  215. """
  216. try:
  217. while True:
  218. try:
  219. chunk = await asyncio.wait_for(queue.get(), timeout=30.0)
  220. except asyncio.TimeoutError:
  221. # No frame in 30s — check whether the client is still there.
  222. # If yes, keep waiting; if no, bail out.
  223. if is_disconnected is not None and await is_disconnected():
  224. break
  225. continue
  226. if chunk == _UPSTREAM_GONE:
  227. break
  228. yield chunk
  229. if is_disconnected is not None and await is_disconnected():
  230. break
  231. finally:
  232. remaining = await broadcaster.unsubscribe(queue)
  233. if on_unsubscribe is not None:
  234. try:
  235. on_unsubscribe(remaining)
  236. except Exception:
  237. logger.exception("on_unsubscribe callback raised")