| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- """Unit tests for Obico detection service (#172)."""
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from backend.app.schemas.settings import AppSettingsUpdate
- from backend.app.services.obico_detection import (
- FRAME_CACHE_TTL,
- ObicoDetectionService,
- _frame_cache,
- pop_frame,
- stash_frame,
- )
- from backend.app.services.obico_smoothing import WARMUP_FRAMES
- FAKE_JPEG = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xd9"
- class TestSettingsSchemaValidators:
- """Guard rails on the new obico_* AppSettings fields."""
- def test_sensitivity_accepts_valid_values(self):
- for value in ("low", "medium", "high"):
- u = AppSettingsUpdate(obico_sensitivity=value)
- assert u.obico_sensitivity == value
- def test_sensitivity_rejects_garbage(self):
- with pytest.raises(ValueError, match="obico_sensitivity"):
- AppSettingsUpdate(obico_sensitivity="extreme")
- def test_action_accepts_valid_values(self):
- for value in ("notify", "pause", "pause_and_off"):
- assert AppSettingsUpdate(obico_action=value).obico_action == value
- def test_action_rejects_garbage(self):
- with pytest.raises(ValueError, match="obico_action"):
- AppSettingsUpdate(obico_action="explode")
- def test_enabled_printers_accepts_empty(self):
- assert AppSettingsUpdate(obico_enabled_printers="").obico_enabled_printers == ""
- assert AppSettingsUpdate(obico_enabled_printers=None).obico_enabled_printers is None
- def test_enabled_printers_accepts_int_array(self):
- u = AppSettingsUpdate(obico_enabled_printers="[1, 2, 3]")
- assert u.obico_enabled_printers == "[1, 2, 3]"
- def test_enabled_printers_rejects_non_json(self):
- with pytest.raises(ValueError, match="valid JSON"):
- AppSettingsUpdate(obico_enabled_printers="1,2,3")
- def test_enabled_printers_rejects_non_list(self):
- with pytest.raises(ValueError, match="JSON array"):
- AppSettingsUpdate(obico_enabled_printers='{"1": true}')
- def test_enabled_printers_rejects_non_int_elements(self):
- with pytest.raises(ValueError, match="JSON array"):
- AppSettingsUpdate(obico_enabled_printers='[1, "two"]')
- def test_poll_interval_bounds(self):
- with pytest.raises(ValueError):
- AppSettingsUpdate(obico_poll_interval=4)
- with pytest.raises(ValueError):
- AppSettingsUpdate(obico_poll_interval=121)
- assert AppSettingsUpdate(obico_poll_interval=10).obico_poll_interval == 10
- class TestGetStatus:
- def test_empty_initial_status(self):
- svc = ObicoDetectionService()
- s = svc.get_status()
- assert s["is_running"] is False
- assert s["per_printer"] == {}
- assert s["history"] == []
- assert "low" in s["thresholds"] and "high" in s["thresholds"]
- def test_thresholds_reflect_configured_sensitivity(self):
- """#1469 — get_status() reports the thresholds for the passed
- sensitivity, not a hardcoded 'medium'. Each level must be distinct so
- the Status panel changes when the user changes the setting."""
- svc = ObicoDetectionService()
- low = svc.get_status("low")["thresholds"]
- medium = svc.get_status("medium")["thresholds"]
- high = svc.get_status("high")["thresholds"]
- # Higher sensitivity → lower thresholds (easier to trigger).
- assert low["low"] > medium["low"] > high["low"]
- assert low["high"] > medium["high"] > high["high"]
- # Default and unknown values fall back to medium.
- assert svc.get_status()["thresholds"] == medium
- assert svc.get_status("bogus")["thresholds"] == medium
- class TestTestConnection:
- @pytest.mark.asyncio
- async def test_empty_url_via_route(self):
- """Service does not special-case empty URL — the route does."""
- svc = ObicoDetectionService()
- # This will fail DNS/connect, but should return ok=False
- result = await svc.test_connection("http://nonexistent-obico-host-xyz.invalid:3333")
- assert result["ok"] is False
- assert result["error"] is not None
- @pytest.mark.asyncio
- async def test_healthy_response_is_ok(self):
- svc = ObicoDetectionService()
- mock_response = MagicMock(status_code=200, text="ok")
- mock_client = MagicMock()
- mock_client.get = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client):
- result = await svc.test_connection("http://obico:3333")
- assert result["ok"] is True
- assert result["status_code"] == 200
- assert result["body"] == "ok"
- assert result["error"] is None
- @pytest.mark.asyncio
- async def test_non_ok_body_is_not_ok(self):
- svc = ObicoDetectionService()
- mock_response = MagicMock(status_code=200, text="something else")
- mock_client = MagicMock()
- mock_client.get = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client):
- result = await svc.test_connection("http://obico:3333/")
- assert result["ok"] is False
- assert result["body"] == "something else"
- class TestPollOneStateLifecycle:
- """Confirms per-printer state is reset when a new print starts."""
- @pytest.mark.asyncio
- async def test_new_task_name_resets_state(self):
- svc = ObicoDetectionService()
- # Seed a state that has been running for a while
- from backend.app.services.obico_smoothing import PrintState
- seeded = PrintState()
- for _ in range(WARMUP_FRAMES + 5):
- seeded.update(0.5)
- svc._states[1] = seeded
- svc._state_keys[1] = "old_task"
- svc._action_fired[1] = True
- settings = {
- "enabled": True,
- "ml_url": "http://obico:3333",
- "sensitivity": "medium",
- "action": "notify",
- "poll_interval": 10,
- "enabled_printers": None,
- "external_url": "http://bambuddy:8000",
- }
- status = MagicMock(state="RUNNING", task_name="new_task", subtask_name="")
- mock_response = MagicMock()
- mock_response.json.return_value = {"detections": []}
- mock_response.raise_for_status = MagicMock()
- mock_client = MagicMock()
- mock_client.get = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with (
- patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
- patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
- ):
- await svc._check_printer(1, status, settings)
- # State was reset (frame_count is 1 after the single update, not 36)
- assert svc._states[1].frame_count == 1
- assert svc._state_keys[1] == "new_task"
- assert svc._action_fired[1] is False
- @pytest.mark.asyncio
- async def test_ml_api_error_does_not_crash(self):
- svc = ObicoDetectionService()
- settings = {
- "enabled": True,
- "ml_url": "http://obico:3333",
- "sensitivity": "medium",
- "action": "notify",
- "poll_interval": 10,
- "enabled_printers": None,
- "external_url": "http://bambuddy:8000",
- }
- status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
- mock_client = MagicMock()
- mock_client.get = AsyncMock(side_effect=RuntimeError("connection refused"))
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with (
- patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
- patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
- ):
- await svc._check_printer(1, status, settings)
- assert svc._last_error is not None
- assert "connection refused" in svc._last_error
- @pytest.mark.asyncio
- async def test_ml_api_empty_exception_message_falls_back_to_type(self):
- """If str(exc) is empty, log the exception class name instead of a blank suffix."""
- svc = ObicoDetectionService()
- settings = {
- "enabled": True,
- "ml_url": "http://obico:3333",
- "sensitivity": "medium",
- "action": "notify",
- "poll_interval": 10,
- "enabled_printers": None,
- "external_url": "http://bambuddy:8000",
- }
- status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
- class _SilentError(Exception):
- def __str__(self) -> str:
- return ""
- mock_client = MagicMock()
- mock_client.get = AsyncMock(side_effect=_SilentError())
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with (
- patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
- patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
- ):
- await svc._check_printer(1, status, settings)
- assert svc._last_error is not None
- assert "_SilentError" in svc._last_error
- # The suffix is never blank
- assert not svc._last_error.rstrip().endswith(":")
- @pytest.mark.asyncio
- async def test_failure_fires_action_only_once(self):
- """Once a failure has fired for a print, subsequent failures should not re-fire."""
- svc = ObicoDetectionService()
- settings = {
- "enabled": True,
- "ml_url": "http://obico:3333",
- "sensitivity": "medium",
- "action": "notify",
- "poll_interval": 10,
- "enabled_printers": None,
- "external_url": "http://bambuddy:8000",
- }
- status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
- # Seed state so the next frame crosses HIGH immediately
- from backend.app.services.obico_smoothing import PrintState
- seeded = PrintState()
- for _ in range(WARMUP_FRAMES + 500):
- seeded.update(1.0)
- svc._states[1] = seeded
- svc._state_keys[1] = "job"
- svc._action_fired[1] = False
- mock_response = MagicMock()
- mock_response.json.return_value = {"detections": [["failure", 0.9, [0, 0, 1, 1]]]}
- mock_response.raise_for_status = MagicMock()
- mock_client = MagicMock()
- mock_client.get = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with (
- patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
- patch("backend.app.services.obico_actions.execute_action", new=AsyncMock()) as mock_action,
- patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
- ):
- await svc._check_printer(1, status, settings)
- assert mock_action.call_count == 1
- await svc._check_printer(1, status, settings)
- # Second call must not dispatch again
- assert mock_action.call_count == 1
- class TestCaptureFrameSharesBroadcasterUpstream:
- """#1271: Obico's per-poll snapshot must reuse the live-stream broadcaster's
- buffered frame when a viewer is watching, instead of opening a second RTSP
- socket. On X2D firmware 01.01.00.00 the second socket kicks the live stream.
- """
- @pytest.mark.asyncio
- async def test_returns_buffered_frame_when_stream_active(self):
- printer = MagicMock(
- external_camera_enabled=False,
- external_camera_url=None,
- ip_address="192.168.1.10",
- access_code="12345678",
- model="N6",
- )
- mock_session = MagicMock()
- mock_session.get = AsyncMock(return_value=printer)
- mock_ctx = MagicMock()
- mock_ctx.__aenter__ = AsyncMock(return_value=mock_session)
- mock_ctx.__aexit__ = AsyncMock(return_value=None)
- svc = ObicoDetectionService()
- with (
- patch("backend.app.services.obico_detection.async_session", return_value=mock_ctx),
- patch(
- "backend.app.api.routes.camera.is_stream_active",
- return_value=True,
- ),
- patch(
- "backend.app.api.routes.camera.try_get_active_buffered_frame",
- return_value=FAKE_JPEG,
- ),
- patch(
- "backend.app.services.camera.capture_camera_frame_bytes",
- new=AsyncMock(return_value=b"FRESH-CAPTURE-SHOULD-NOT-BE-USED"),
- ) as mock_fresh,
- ):
- result = await svc._capture_frame(printer_id=1)
- assert result == FAKE_JPEG
- mock_fresh.assert_not_called()
- @pytest.mark.asyncio
- async def test_skips_poll_when_stream_active_but_buffer_empty(self):
- """#1348: viewer attached + buffer empty must NOT open a competing socket."""
- printer = MagicMock(
- external_camera_enabled=False,
- external_camera_url=None,
- ip_address="192.168.1.10",
- access_code="12345678",
- model="X1C",
- )
- mock_session = MagicMock()
- mock_session.get = AsyncMock(return_value=printer)
- mock_ctx = MagicMock()
- mock_ctx.__aenter__ = AsyncMock(return_value=mock_session)
- mock_ctx.__aexit__ = AsyncMock(return_value=None)
- svc = ObicoDetectionService()
- with (
- patch("backend.app.services.obico_detection.async_session", return_value=mock_ctx),
- patch(
- "backend.app.api.routes.camera.is_stream_active",
- return_value=True,
- ),
- patch(
- "backend.app.api.routes.camera.try_get_active_buffered_frame",
- return_value=None, # Stream active, but first frame not buffered yet
- ),
- patch(
- "backend.app.services.camera.capture_camera_frame_bytes",
- new=AsyncMock(return_value=b"FRESH-CAPTURE-WOULD-KICK-VIEWER"),
- ) as mock_fresh,
- ):
- result = await svc._capture_frame(printer_id=1)
- assert result is None, "must skip this poll cycle, not open a competing socket"
- mock_fresh.assert_not_called()
- @pytest.mark.asyncio
- async def test_falls_back_to_fresh_capture_when_no_stream(self):
- printer = MagicMock(
- external_camera_enabled=False,
- external_camera_url=None,
- ip_address="192.168.1.10",
- access_code="12345678",
- model="N6",
- )
- mock_session = MagicMock()
- mock_session.get = AsyncMock(return_value=printer)
- mock_ctx = MagicMock()
- mock_ctx.__aenter__ = AsyncMock(return_value=mock_session)
- mock_ctx.__aexit__ = AsyncMock(return_value=None)
- svc = ObicoDetectionService()
- with (
- patch("backend.app.services.obico_detection.async_session", return_value=mock_ctx),
- patch(
- "backend.app.api.routes.camera.is_stream_active",
- return_value=False,
- ),
- patch(
- "backend.app.services.camera.capture_camera_frame_bytes",
- new=AsyncMock(return_value=FAKE_JPEG),
- ) as mock_fresh,
- ):
- result = await svc._capture_frame(printer_id=1)
- assert result == FAKE_JPEG
- mock_fresh.assert_called_once()
- class TestFrameCache:
- """One-shot JPEG cache that lets us sidestep Obico's 5s read timeout.
- Obico's ML API fetches snapshots via `GET /p/?img=URL` with `timeout=(0.1, 5)`.
- Our /camera/snapshot can exceed that on cold calls (RTSP keyframe wait). So the
- detection loop captures locally, stashes the JPEG bytes under a nonce, then hands
- Obico a URL that returns those bytes instantly. The cache is single-use + TTLed
- so a leaked nonce can't be replayed.
- """
- def setup_method(self):
- _frame_cache.clear()
- @pytest.mark.asyncio
- async def test_stash_and_pop_roundtrip(self):
- nonce = await stash_frame(FAKE_JPEG)
- assert nonce # non-empty URL-safe token
- data = await pop_frame(nonce)
- assert data == FAKE_JPEG
- @pytest.mark.asyncio
- async def test_nonce_is_single_use(self):
- nonce = await stash_frame(FAKE_JPEG)
- assert await pop_frame(nonce) == FAKE_JPEG
- # Second pop returns None — caches replay protection
- assert await pop_frame(nonce) is None
- @pytest.mark.asyncio
- async def test_unknown_nonce_returns_none(self):
- assert await pop_frame("not-a-real-nonce") is None
- @pytest.mark.asyncio
- async def test_stash_produces_unique_nonces(self):
- nonces = {await stash_frame(FAKE_JPEG) for _ in range(10)}
- assert len(nonces) == 10
- @pytest.mark.asyncio
- async def test_expired_entries_are_pruned_on_stash(self):
- """New entries trigger pruning of TTL-expired ones — prevents unbounded growth."""
- # Manually seed an entry with a stale timestamp
- import time as time_module
- _frame_cache["stale-nonce"] = (FAKE_JPEG, time_module.monotonic() - FRAME_CACHE_TTL - 1)
- await stash_frame(FAKE_JPEG)
- # Stale entry was pruned
- assert "stale-nonce" not in _frame_cache
- @pytest.mark.asyncio
- async def test_pop_rejects_expired_nonce(self):
- """Even if the entry is still in the dict, an expired TTL returns None."""
- import time as time_module
- _frame_cache["aging-nonce"] = (FAKE_JPEG, time_module.monotonic() - FRAME_CACHE_TTL - 1)
- assert await pop_frame("aging-nonce") is None
- class TestCheckPrinterUsesCachedFrameUrl:
- """The URL sent to Obico must point at our nonce endpoint, not /camera/snapshot."""
- def setup_method(self):
- _frame_cache.clear()
- @pytest.mark.asyncio
- async def test_ml_api_called_with_cached_frame_url(self):
- svc = ObicoDetectionService()
- settings = {
- "enabled": True,
- "ml_url": "http://obico:3333",
- "sensitivity": "medium",
- "action": "notify",
- "poll_interval": 10,
- "enabled_printers": None,
- "external_url": "http://bambuddy:8000",
- }
- status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
- mock_response = MagicMock()
- mock_response.json.return_value = {"detections": []}
- mock_response.raise_for_status = MagicMock()
- mock_client = MagicMock()
- mock_client.get = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with (
- patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
- patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
- ):
- await svc._check_printer(1, status, settings)
- # ML API was called via GET (Obico's /p/ is GET-only)
- mock_client.get.assert_called_once()
- _args, kwargs = mock_client.get.call_args
- assert _args[0] == "http://obico:3333/p/"
- img_url = kwargs["params"]["img"]
- assert img_url.startswith("http://bambuddy:8000/api/v1/obico/cached-frame/")
- # The path segment after /cached-frame/ is the nonce itself — that nonce must
- # resolve back to our stashed frame (single-use guarantees freshness).
- nonce = img_url.rsplit("/", 1)[-1]
- assert await pop_frame(nonce) == FAKE_JPEG
- @pytest.mark.asyncio
- async def test_capture_failure_skips_ml_call(self):
- """If we can't capture a frame, don't bother the ML API."""
- svc = ObicoDetectionService()
- settings = {
- "enabled": True,
- "ml_url": "http://obico:3333",
- "sensitivity": "medium",
- "action": "notify",
- "poll_interval": 10,
- "enabled_printers": None,
- "external_url": "http://bambuddy:8000",
- }
- status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
- mock_client = MagicMock()
- mock_client.get = AsyncMock()
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with (
- patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
- patch.object(svc, "_capture_frame", new=AsyncMock(return_value=None)),
- ):
- await svc._check_printer(1, status, settings)
- mock_client.get.assert_not_called()
- assert svc._last_error is not None
- assert "Failed to capture snapshot" in svc._last_error
- @pytest.mark.asyncio
- async def test_missing_external_url_skips_ml_call(self):
- """Without external_url, Obico can't reach our cached-frame endpoint."""
- svc = ObicoDetectionService()
- settings = {
- "enabled": True,
- "ml_url": "http://obico:3333",
- "sensitivity": "medium",
- "action": "notify",
- "poll_interval": 10,
- "enabled_printers": None,
- "external_url": "",
- }
- status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
- mock_client = MagicMock()
- mock_client.get = AsyncMock()
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with (
- patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
- patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
- ):
- await svc._check_printer(1, status, settings)
- mock_client.get.assert_not_called()
- assert svc._last_error is not None
- assert "external_url" in svc._last_error
- @pytest.mark.asyncio
- async def test_successful_cycle_clears_previous_error(self):
- """A cold-start RTSP timeout sets _last_error; the next successful poll must clear it.
- Regression for #172: the Status card banner ("Failed to capture snapshot for
- printer 1") stuck around after a one-off cold-start failure even though every
- subsequent poll captured + detected successfully.
- """
- svc = ObicoDetectionService()
- settings = {
- "enabled": True,
- "ml_url": "http://obico:3333",
- "sensitivity": "medium",
- "action": "notify",
- "poll_interval": 10,
- "enabled_printers": None,
- "external_url": "http://bambuddy:8000",
- }
- status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
- # Seed a prior transient error, as would be left by a cold-start capture timeout.
- svc._last_error = "Failed to capture snapshot for printer 1"
- mock_response = MagicMock()
- mock_response.json.return_value = {"detections": []}
- mock_response.raise_for_status = MagicMock()
- mock_client = MagicMock()
- mock_client.get = AsyncMock(return_value=mock_response)
- mock_client.__aenter__ = AsyncMock(return_value=mock_client)
- mock_client.__aexit__ = AsyncMock(return_value=False)
- with (
- patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
- patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
- ):
- await svc._check_printer(1, status, settings)
- assert svc._last_error is None
|