|
@@ -5,9 +5,17 @@ from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
import pytest
|
|
import pytest
|
|
|
|
|
|
|
|
from backend.app.schemas.settings import AppSettingsUpdate
|
|
from backend.app.schemas.settings import AppSettingsUpdate
|
|
|
-from backend.app.services.obico_detection import ObicoDetectionService
|
|
|
|
|
|
|
+from backend.app.services.obico_detection import (
|
|
|
|
|
+ FRAME_CACHE_TTL,
|
|
|
|
|
+ ObicoDetectionService,
|
|
|
|
|
+ _frame_cache,
|
|
|
|
|
+ pop_frame,
|
|
|
|
|
+ stash_frame,
|
|
|
|
|
+)
|
|
|
from backend.app.services.obico_smoothing import WARMUP_FRAMES
|
|
from backend.app.services.obico_smoothing import WARMUP_FRAMES
|
|
|
|
|
|
|
|
|
|
+FAKE_JPEG = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xd9"
|
|
|
|
|
+
|
|
|
|
|
|
|
|
class TestSettingsSchemaValidators:
|
|
class TestSettingsSchemaValidators:
|
|
|
"""Guard rails on the new obico_* AppSettings fields."""
|
|
"""Guard rails on the new obico_* AppSettings fields."""
|
|
@@ -145,10 +153,7 @@ class TestPollOneStateLifecycle:
|
|
|
|
|
|
|
|
with (
|
|
with (
|
|
|
patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
|
|
patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
|
|
|
- patch(
|
|
|
|
|
- "backend.app.services.obico_detection.create_camera_stream_token",
|
|
|
|
|
- new=AsyncMock(return_value="tok"),
|
|
|
|
|
- ),
|
|
|
|
|
|
|
+ patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
|
|
|
):
|
|
):
|
|
|
await svc._check_printer(1, status, settings)
|
|
await svc._check_printer(1, status, settings)
|
|
|
|
|
|
|
@@ -178,10 +183,7 @@ class TestPollOneStateLifecycle:
|
|
|
|
|
|
|
|
with (
|
|
with (
|
|
|
patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
|
|
patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
|
|
|
- patch(
|
|
|
|
|
- "backend.app.services.obico_detection.create_camera_stream_token",
|
|
|
|
|
- new=AsyncMock(return_value="tok"),
|
|
|
|
|
- ),
|
|
|
|
|
|
|
+ patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
|
|
|
):
|
|
):
|
|
|
await svc._check_printer(1, status, settings)
|
|
await svc._check_printer(1, status, settings)
|
|
|
|
|
|
|
@@ -224,13 +226,140 @@ class TestPollOneStateLifecycle:
|
|
|
with (
|
|
with (
|
|
|
patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
|
|
patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
|
|
|
patch("backend.app.services.obico_actions.execute_action", new=AsyncMock()) as mock_action,
|
|
patch("backend.app.services.obico_actions.execute_action", new=AsyncMock()) as mock_action,
|
|
|
- patch(
|
|
|
|
|
- "backend.app.services.obico_detection.create_camera_stream_token",
|
|
|
|
|
- new=AsyncMock(return_value="tok"),
|
|
|
|
|
- ),
|
|
|
|
|
|
|
+ patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
|
|
|
):
|
|
):
|
|
|
await svc._check_printer(1, status, settings)
|
|
await svc._check_printer(1, status, settings)
|
|
|
assert mock_action.call_count == 1
|
|
assert mock_action.call_count == 1
|
|
|
await svc._check_printer(1, status, settings)
|
|
await svc._check_printer(1, status, settings)
|
|
|
# Second call must not dispatch again
|
|
# Second call must not dispatch again
|
|
|
assert mock_action.call_count == 1
|
|
assert mock_action.call_count == 1
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TestFrameCache:
|
|
|
|
|
+ """One-shot JPEG cache that lets us sidestep Obico's 5s read timeout.
|
|
|
|
|
+
|
|
|
|
|
+ Obico's ML API fetches snapshots via `GET /p/?img=URL` with `timeout=(0.1, 5)`.
|
|
|
|
|
+ Our /camera/snapshot can exceed that on cold calls (RTSP keyframe wait). So the
|
|
|
|
|
+ detection loop captures locally, stashes the JPEG bytes under a nonce, then hands
|
|
|
|
|
+ Obico a URL that returns those bytes instantly. The cache is single-use + TTLed
|
|
|
|
|
+ so a leaked nonce can't be replayed.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ def setup_method(self):
|
|
|
|
|
+ _frame_cache.clear()
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_stash_and_pop_roundtrip(self):
|
|
|
|
|
+ nonce = await stash_frame(FAKE_JPEG)
|
|
|
|
|
+ assert nonce # non-empty URL-safe token
|
|
|
|
|
+ data = await pop_frame(nonce)
|
|
|
|
|
+ assert data == FAKE_JPEG
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_nonce_is_single_use(self):
|
|
|
|
|
+ nonce = await stash_frame(FAKE_JPEG)
|
|
|
|
|
+ assert await pop_frame(nonce) == FAKE_JPEG
|
|
|
|
|
+ # Second pop returns None — caches replay protection
|
|
|
|
|
+ assert await pop_frame(nonce) is None
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_unknown_nonce_returns_none(self):
|
|
|
|
|
+ assert await pop_frame("not-a-real-nonce") is None
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_stash_produces_unique_nonces(self):
|
|
|
|
|
+ nonces = {await stash_frame(FAKE_JPEG) for _ in range(10)}
|
|
|
|
|
+ assert len(nonces) == 10
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_expired_entries_are_pruned_on_stash(self):
|
|
|
|
|
+ """New entries trigger pruning of TTL-expired ones — prevents unbounded growth."""
|
|
|
|
|
+ # Manually seed an entry with a stale timestamp
|
|
|
|
|
+ import time as time_module
|
|
|
|
|
+
|
|
|
|
|
+ _frame_cache["stale-nonce"] = (FAKE_JPEG, time_module.monotonic() - FRAME_CACHE_TTL - 1)
|
|
|
|
|
+ await stash_frame(FAKE_JPEG)
|
|
|
|
|
+ # Stale entry was pruned
|
|
|
|
|
+ assert "stale-nonce" not in _frame_cache
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_pop_rejects_expired_nonce(self):
|
|
|
|
|
+ """Even if the entry is still in the dict, an expired TTL returns None."""
|
|
|
|
|
+ import time as time_module
|
|
|
|
|
+
|
|
|
|
|
+ _frame_cache["aging-nonce"] = (FAKE_JPEG, time_module.monotonic() - FRAME_CACHE_TTL - 1)
|
|
|
|
|
+ assert await pop_frame("aging-nonce") is None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TestCheckPrinterUsesCachedFrameUrl:
|
|
|
|
|
+ """The URL sent to Obico must point at our nonce endpoint, not /camera/snapshot."""
|
|
|
|
|
+
|
|
|
|
|
+ def setup_method(self):
|
|
|
|
|
+ _frame_cache.clear()
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_ml_api_called_with_cached_frame_url(self):
|
|
|
|
|
+ svc = ObicoDetectionService()
|
|
|
|
|
+ settings = {
|
|
|
|
|
+ "enabled": True,
|
|
|
|
|
+ "ml_url": "http://obico:3333",
|
|
|
|
|
+ "sensitivity": "medium",
|
|
|
|
|
+ "action": "notify",
|
|
|
|
|
+ "poll_interval": 10,
|
|
|
|
|
+ "enabled_printers": None,
|
|
|
|
|
+ "external_url": "http://bambuddy:8000",
|
|
|
|
|
+ }
|
|
|
|
|
+ status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
|
|
|
|
|
+
|
|
|
|
|
+ mock_response = MagicMock()
|
|
|
|
|
+ mock_response.json.return_value = {"detections": []}
|
|
|
|
|
+ mock_response.raise_for_status = MagicMock()
|
|
|
|
|
+ mock_client = MagicMock()
|
|
|
|
|
+ mock_client.get = AsyncMock(return_value=mock_response)
|
|
|
|
|
+ mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
|
|
|
+ mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
|
+
|
|
|
|
|
+ with (
|
|
|
|
|
+ patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
|
|
|
|
|
+ patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
|
|
|
|
|
+ ):
|
|
|
|
|
+ await svc._check_printer(1, status, settings)
|
|
|
|
|
+
|
|
|
|
|
+ # Extract the img URL handed to the ML API
|
|
|
|
|
+ _args, kwargs = mock_client.get.call_args
|
|
|
|
|
+ img_url = kwargs["params"]["img"]
|
|
|
|
|
+ assert img_url.startswith("http://bambuddy:8000/api/v1/obico/cached-frame/")
|
|
|
|
|
+ # The path segment after /cached-frame/ is the nonce itself — that nonce must
|
|
|
|
|
+ # resolve back to our stashed frame (single-use guarantees freshness).
|
|
|
|
|
+ nonce = img_url.rsplit("/", 1)[-1]
|
|
|
|
|
+ assert await pop_frame(nonce) == FAKE_JPEG
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_capture_failure_skips_ml_call(self):
|
|
|
|
|
+ """If we can't capture a frame, don't bother the ML API."""
|
|
|
|
|
+ svc = ObicoDetectionService()
|
|
|
|
|
+ settings = {
|
|
|
|
|
+ "enabled": True,
|
|
|
|
|
+ "ml_url": "http://obico:3333",
|
|
|
|
|
+ "sensitivity": "medium",
|
|
|
|
|
+ "action": "notify",
|
|
|
|
|
+ "poll_interval": 10,
|
|
|
|
|
+ "enabled_printers": None,
|
|
|
|
|
+ "external_url": "http://bambuddy:8000",
|
|
|
|
|
+ }
|
|
|
|
|
+ status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
|
|
|
|
|
+
|
|
|
|
|
+ mock_client = MagicMock()
|
|
|
|
|
+ mock_client.get = AsyncMock()
|
|
|
|
|
+ mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
|
|
|
+ mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
|
+
|
|
|
|
|
+ with (
|
|
|
|
|
+ patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
|
|
|
|
|
+ patch.object(svc, "_capture_frame", new=AsyncMock(return_value=None)),
|
|
|
|
|
+ ):
|
|
|
|
|
+ await svc._check_printer(1, status, settings)
|
|
|
|
|
+
|
|
|
|
|
+ mock_client.get.assert_not_called()
|
|
|
|
|
+ assert svc._last_error is not None
|
|
|
|
|
+ assert "Failed to capture snapshot" in svc._last_error
|