Sfoglia il codice sorgente

fix(camera): share broadcaster buffered frame with Obico + /camera/snapshot (#1271)

  The MJPEG fan-out broadcaster from #1089 only solved viewer-side
  concurrency. Obico polling (every 5s) and the manual /camera/snapshot
  endpoint kept opening their own fresh RTSP sockets, which X1/H2/P2
  firmwares tolerated but X2D firmware 01.01.00.00 enforces strict
  single-connection on — every poll kicked the live stream.

  Add try_get_active_buffered_frame(printer_id): returns the broadcaster's
  last buffered frame when a viewer is connected, None otherwise. Obico
  and /camera/snapshot consult it before opening a fresh socket. When no
  viewer is active they fall through to the existing fresh-capture path.

  plate_detection and layer_timelapse intentionally not converted.
maziggy 2 settimane fa
parent
commit
c097140e4c

File diff suppressed because it is too large
+ 0 - 0
CHANGELOG.md


+ 34 - 0
backend/app/api/routes/camera.py

@@ -79,6 +79,25 @@ def get_buffered_frame(printer_id: int) -> bytes | None:
     return _last_frames.get(printer_id)
     return _last_frames.get(printer_id)
 
 
 
 
+def try_get_active_buffered_frame(printer_id: int) -> bytes | None:
+    """Return a buffered frame iff a stream is currently running for this printer.
+
+    Snapshot callers (Obico polling, manual /camera/snapshot) tap the fan-out
+    broadcaster's running upstream instead of opening a second concurrent
+    RTSP/chamber-image socket. Critical for printers that allow only one
+    camera connection (e.g. X2D firmware 01.01.00.00; see #1271).
+
+    Returns None when no broadcaster is active for this printer, so callers
+    fall through to their existing fresh-socket path unchanged.
+    """
+    has_stream = any(k.startswith(f"{printer_id}-") for k in _active_streams) or any(
+        k.startswith(f"{printer_id}-") for k in _active_chamber_streams
+    )
+    if not has_stream:
+        return None
+    return _last_frames.get(printer_id)
+
+
 async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
 async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
     """Get printer by ID or raise 404."""
     """Get printer by ID or raise 404."""
     result = await db.execute(select(Printer).where(Printer.id == printer_id))
     result = await db.execute(select(Printer).where(Printer.id == printer_id))
@@ -812,6 +831,21 @@ async def camera_snapshot(
             },
             },
         )
         )
 
 
+    # Reuse the fan-out broadcaster's buffered frame when a viewer is already
+    # watching — avoids opening a second concurrent RTSP socket on printers
+    # that allow only one camera connection (e.g. X2D firmware 01.01.00.00;
+    # see #1271). Buffered frame is <1s old while a viewer is connected.
+    buffered = try_get_active_buffered_frame(printer_id)
+    if buffered:
+        return Response(
+            content=buffered,
+            media_type="image/jpeg",
+            headers={
+                "Cache-Control": "no-cache, no-store, must-revalidate",
+                "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
+            },
+        )
+
     # Create temporary file for the snapshot (0600 so only the app user can read it)
     # Create temporary file for the snapshot (0600 so only the app user can read it)
     fd, tmp_name = tempfile.mkstemp(suffix=".jpg")
     fd, tmp_name = tempfile.mkstemp(suffix=".jpg")
     os.close(fd)
     os.close(fd)

+ 13 - 0
backend/app/services/obico_detection.py

@@ -199,6 +199,19 @@ class ObicoDetectionService:
                 timeout=SNAPSHOT_CAPTURE_TIMEOUT,
                 timeout=SNAPSHOT_CAPTURE_TIMEOUT,
                 snapshot_url=printer.external_camera_snapshot_url,
                 snapshot_url=printer.external_camera_snapshot_url,
             )
             )
+
+        # Reuse the fan-out broadcaster's buffered frame when a viewer is
+        # already watching — avoids opening a second concurrent RTSP socket
+        # on printers that allow only one camera connection (e.g. X2D
+        # firmware 01.01.00.00; see #1271). Buffered frame is <1s old while
+        # a viewer is connected. Returns None when no stream is active, so
+        # we fall through to a fresh socket as before.
+        from backend.app.api.routes.camera import try_get_active_buffered_frame
+
+        buffered = try_get_active_buffered_frame(printer_id)
+        if buffered:
+            return buffered
+
         return await capture_camera_frame_bytes(
         return await capture_camera_frame_bytes(
             ip_address=printer.ip_address,
             ip_address=printer.ip_address,
             access_code=printer.access_code,
             access_code=printer.access_code,

+ 29 - 0
backend/tests/integration/test_camera_api.py

@@ -239,6 +239,35 @@ class TestCameraAPI:
         assert response.status_code == 503
         assert response.status_code == 503
         assert "Failed to capture" in response.json()["detail"]
         assert "Failed to capture" in response.json()["detail"]
 
 
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_camera_snapshot_reuses_buffered_frame_when_stream_active(
+        self, async_client: AsyncClient, printer_factory
+    ):
+        """#1271: /camera/snapshot must reuse the broadcaster's buffered frame
+        when a live stream is running, instead of opening a second concurrent
+        RTSP socket. On printers with strict single-connection enforcement (e.g.
+        X2D firmware 01.01.00.00) opening a second socket kicks the live stream.
+        """
+        printer = await printer_factory()
+        fake_jpeg = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00"
+
+        # Simulate a running broadcaster: one active stream entry + buffered frame.
+        active_streams = {f"{printer.id}-fanout": MagicMock()}
+        last_frames = {printer.id: fake_jpeg}
+
+        with (
+            patch("backend.app.api.routes.camera._active_streams", active_streams),
+            patch("backend.app.api.routes.camera._last_frames", last_frames),
+            patch("backend.app.api.routes.camera.capture_camera_frame", new_callable=AsyncMock) as mock_capture,
+        ):
+            response = await async_client.get(f"/api/v1/printers/{printer.id}/camera/snapshot")
+
+        assert response.status_code == 200
+        assert response.content == fake_jpeg
+        # The fresh-capture path must NOT have been taken — that's the whole point.
+        mock_capture.assert_not_called()
+
     @pytest.mark.asyncio
     @pytest.mark.asyncio
     @pytest.mark.integration
     @pytest.mark.integration
     async def test_camera_snapshot_external_camera_success(self, async_client: AsyncClient, printer_factory):
     async def test_camera_snapshot_external_camera_success(self, async_client: AsyncClient, printer_factory):

+ 71 - 0
backend/tests/unit/test_obico_detection.py

@@ -270,6 +270,77 @@ class TestPollOneStateLifecycle:
             assert mock_action.call_count == 1
             assert mock_action.call_count == 1
 
 
 
 
+class TestCaptureFrameSharesBroadcasterUpstream:
+    """#1271: Obico's per-poll snapshot must reuse the live-stream broadcaster's
+    buffered frame when a viewer is watching, instead of opening a second RTSP
+    socket. On X2D firmware 01.01.00.00 the second socket kicks the live stream.
+    """
+
+    @pytest.mark.asyncio
+    async def test_returns_buffered_frame_when_stream_active(self):
+        printer = MagicMock(
+            external_camera_enabled=False,
+            external_camera_url=None,
+            ip_address="192.168.1.10",
+            access_code="12345678",
+            model="N6",
+        )
+        mock_session = MagicMock()
+        mock_session.get = AsyncMock(return_value=printer)
+        mock_ctx = MagicMock()
+        mock_ctx.__aenter__ = AsyncMock(return_value=mock_session)
+        mock_ctx.__aexit__ = AsyncMock(return_value=None)
+
+        svc = ObicoDetectionService()
+        with (
+            patch("backend.app.services.obico_detection.async_session", return_value=mock_ctx),
+            patch(
+                "backend.app.api.routes.camera.try_get_active_buffered_frame",
+                return_value=FAKE_JPEG,
+            ),
+            patch(
+                "backend.app.services.camera.capture_camera_frame_bytes",
+                new=AsyncMock(return_value=b"FRESH-CAPTURE-SHOULD-NOT-BE-USED"),
+            ) as mock_fresh,
+        ):
+            result = await svc._capture_frame(printer_id=1)
+
+        assert result == FAKE_JPEG
+        mock_fresh.assert_not_called()
+
+    @pytest.mark.asyncio
+    async def test_falls_back_to_fresh_capture_when_no_stream(self):
+        printer = MagicMock(
+            external_camera_enabled=False,
+            external_camera_url=None,
+            ip_address="192.168.1.10",
+            access_code="12345678",
+            model="N6",
+        )
+        mock_session = MagicMock()
+        mock_session.get = AsyncMock(return_value=printer)
+        mock_ctx = MagicMock()
+        mock_ctx.__aenter__ = AsyncMock(return_value=mock_session)
+        mock_ctx.__aexit__ = AsyncMock(return_value=None)
+
+        svc = ObicoDetectionService()
+        with (
+            patch("backend.app.services.obico_detection.async_session", return_value=mock_ctx),
+            patch(
+                "backend.app.api.routes.camera.try_get_active_buffered_frame",
+                return_value=None,  # No active stream
+            ),
+            patch(
+                "backend.app.services.camera.capture_camera_frame_bytes",
+                new=AsyncMock(return_value=FAKE_JPEG),
+            ) as mock_fresh,
+        ):
+            result = await svc._capture_frame(printer_id=1)
+
+        assert result == FAKE_JPEG
+        mock_fresh.assert_called_once()
+
+
 class TestFrameCache:
 class TestFrameCache:
     """One-shot JPEG cache that lets us sidestep Obico's 5s read timeout.
     """One-shot JPEG cache that lets us sidestep Obico's 5s read timeout.
 
 

Some files were not shown because too many files changed in this diff