test_obico_detection.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. """Unit tests for Obico detection service (#172)."""
  2. from unittest.mock import AsyncMock, MagicMock, patch
  3. import pytest
  4. from backend.app.schemas.settings import AppSettingsUpdate
  5. from backend.app.services.obico_detection import (
  6. FRAME_CACHE_TTL,
  7. ObicoDetectionService,
  8. _frame_cache,
  9. pop_frame,
  10. stash_frame,
  11. )
  12. from backend.app.services.obico_smoothing import WARMUP_FRAMES
  13. FAKE_JPEG = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xd9"
  14. class TestSettingsSchemaValidators:
  15. """Guard rails on the new obico_* AppSettings fields."""
  16. def test_sensitivity_accepts_valid_values(self):
  17. for value in ("low", "medium", "high"):
  18. u = AppSettingsUpdate(obico_sensitivity=value)
  19. assert u.obico_sensitivity == value
  20. def test_sensitivity_rejects_garbage(self):
  21. with pytest.raises(ValueError, match="obico_sensitivity"):
  22. AppSettingsUpdate(obico_sensitivity="extreme")
  23. def test_action_accepts_valid_values(self):
  24. for value in ("notify", "pause", "pause_and_off"):
  25. assert AppSettingsUpdate(obico_action=value).obico_action == value
  26. def test_action_rejects_garbage(self):
  27. with pytest.raises(ValueError, match="obico_action"):
  28. AppSettingsUpdate(obico_action="explode")
  29. def test_enabled_printers_accepts_empty(self):
  30. assert AppSettingsUpdate(obico_enabled_printers="").obico_enabled_printers == ""
  31. assert AppSettingsUpdate(obico_enabled_printers=None).obico_enabled_printers is None
  32. def test_enabled_printers_accepts_int_array(self):
  33. u = AppSettingsUpdate(obico_enabled_printers="[1, 2, 3]")
  34. assert u.obico_enabled_printers == "[1, 2, 3]"
  35. def test_enabled_printers_rejects_non_json(self):
  36. with pytest.raises(ValueError, match="valid JSON"):
  37. AppSettingsUpdate(obico_enabled_printers="1,2,3")
  38. def test_enabled_printers_rejects_non_list(self):
  39. with pytest.raises(ValueError, match="JSON array"):
  40. AppSettingsUpdate(obico_enabled_printers='{"1": true}')
  41. def test_enabled_printers_rejects_non_int_elements(self):
  42. with pytest.raises(ValueError, match="JSON array"):
  43. AppSettingsUpdate(obico_enabled_printers='[1, "two"]')
  44. def test_poll_interval_bounds(self):
  45. with pytest.raises(ValueError):
  46. AppSettingsUpdate(obico_poll_interval=4)
  47. with pytest.raises(ValueError):
  48. AppSettingsUpdate(obico_poll_interval=121)
  49. assert AppSettingsUpdate(obico_poll_interval=10).obico_poll_interval == 10
  50. class TestGetStatus:
  51. def test_empty_initial_status(self):
  52. svc = ObicoDetectionService()
  53. s = svc.get_status()
  54. assert s["is_running"] is False
  55. assert s["per_printer"] == {}
  56. assert s["history"] == []
  57. assert "low" in s["thresholds"] and "high" in s["thresholds"]
  58. class TestTestConnection:
  59. @pytest.mark.asyncio
  60. async def test_empty_url_via_route(self):
  61. """Service does not special-case empty URL — the route does."""
  62. svc = ObicoDetectionService()
  63. # This will fail DNS/connect, but should return ok=False
  64. result = await svc.test_connection("http://nonexistent-obico-host-xyz.invalid:3333")
  65. assert result["ok"] is False
  66. assert result["error"] is not None
  67. @pytest.mark.asyncio
  68. async def test_healthy_response_is_ok(self):
  69. svc = ObicoDetectionService()
  70. mock_response = MagicMock(status_code=200, text="ok")
  71. mock_client = MagicMock()
  72. mock_client.get = AsyncMock(return_value=mock_response)
  73. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  74. mock_client.__aexit__ = AsyncMock(return_value=False)
  75. with patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client):
  76. result = await svc.test_connection("http://obico:3333")
  77. assert result["ok"] is True
  78. assert result["status_code"] == 200
  79. assert result["body"] == "ok"
  80. assert result["error"] is None
  81. @pytest.mark.asyncio
  82. async def test_non_ok_body_is_not_ok(self):
  83. svc = ObicoDetectionService()
  84. mock_response = MagicMock(status_code=200, text="something else")
  85. mock_client = MagicMock()
  86. mock_client.get = AsyncMock(return_value=mock_response)
  87. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  88. mock_client.__aexit__ = AsyncMock(return_value=False)
  89. with patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client):
  90. result = await svc.test_connection("http://obico:3333/")
  91. assert result["ok"] is False
  92. assert result["body"] == "something else"
  93. class TestPollOneStateLifecycle:
  94. """Confirms per-printer state is reset when a new print starts."""
  95. @pytest.mark.asyncio
  96. async def test_new_task_name_resets_state(self):
  97. svc = ObicoDetectionService()
  98. # Seed a state that has been running for a while
  99. from backend.app.services.obico_smoothing import PrintState
  100. seeded = PrintState()
  101. for _ in range(WARMUP_FRAMES + 5):
  102. seeded.update(0.5)
  103. svc._states[1] = seeded
  104. svc._state_keys[1] = "old_task"
  105. svc._action_fired[1] = True
  106. settings = {
  107. "enabled": True,
  108. "ml_url": "http://obico:3333",
  109. "sensitivity": "medium",
  110. "action": "notify",
  111. "poll_interval": 10,
  112. "enabled_printers": None,
  113. "external_url": "http://bambuddy:8000",
  114. }
  115. status = MagicMock(state="RUNNING", task_name="new_task", subtask_name="")
  116. mock_response = MagicMock()
  117. mock_response.json.return_value = {"detections": []}
  118. mock_response.raise_for_status = MagicMock()
  119. mock_client = MagicMock()
  120. mock_client.get = AsyncMock(return_value=mock_response)
  121. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  122. mock_client.__aexit__ = AsyncMock(return_value=False)
  123. with (
  124. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  125. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  126. ):
  127. await svc._check_printer(1, status, settings)
  128. # State was reset (frame_count is 1 after the single update, not 36)
  129. assert svc._states[1].frame_count == 1
  130. assert svc._state_keys[1] == "new_task"
  131. assert svc._action_fired[1] is False
  132. @pytest.mark.asyncio
  133. async def test_ml_api_error_does_not_crash(self):
  134. svc = ObicoDetectionService()
  135. settings = {
  136. "enabled": True,
  137. "ml_url": "http://obico:3333",
  138. "sensitivity": "medium",
  139. "action": "notify",
  140. "poll_interval": 10,
  141. "enabled_printers": None,
  142. "external_url": "http://bambuddy:8000",
  143. }
  144. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  145. mock_client = MagicMock()
  146. mock_client.get = AsyncMock(side_effect=RuntimeError("connection refused"))
  147. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  148. mock_client.__aexit__ = AsyncMock(return_value=False)
  149. with (
  150. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  151. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  152. ):
  153. await svc._check_printer(1, status, settings)
  154. assert svc._last_error is not None
  155. assert "connection refused" in svc._last_error
  156. @pytest.mark.asyncio
  157. async def test_failure_fires_action_only_once(self):
  158. """Once a failure has fired for a print, subsequent failures should not re-fire."""
  159. svc = ObicoDetectionService()
  160. settings = {
  161. "enabled": True,
  162. "ml_url": "http://obico:3333",
  163. "sensitivity": "medium",
  164. "action": "notify",
  165. "poll_interval": 10,
  166. "enabled_printers": None,
  167. "external_url": "http://bambuddy:8000",
  168. }
  169. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  170. # Seed state so the next frame crosses HIGH immediately
  171. from backend.app.services.obico_smoothing import PrintState
  172. seeded = PrintState()
  173. for _ in range(WARMUP_FRAMES + 500):
  174. seeded.update(1.0)
  175. svc._states[1] = seeded
  176. svc._state_keys[1] = "job"
  177. svc._action_fired[1] = False
  178. mock_response = MagicMock()
  179. mock_response.json.return_value = {"detections": [["failure", 0.9, [0, 0, 1, 1]]]}
  180. mock_response.raise_for_status = MagicMock()
  181. mock_client = MagicMock()
  182. mock_client.get = AsyncMock(return_value=mock_response)
  183. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  184. mock_client.__aexit__ = AsyncMock(return_value=False)
  185. with (
  186. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  187. patch("backend.app.services.obico_actions.execute_action", new=AsyncMock()) as mock_action,
  188. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  189. ):
  190. await svc._check_printer(1, status, settings)
  191. assert mock_action.call_count == 1
  192. await svc._check_printer(1, status, settings)
  193. # Second call must not dispatch again
  194. assert mock_action.call_count == 1
  195. class TestFrameCache:
  196. """One-shot JPEG cache that lets us sidestep Obico's 5s read timeout.
  197. Obico's ML API fetches snapshots via `GET /p/?img=URL` with `timeout=(0.1, 5)`.
  198. Our /camera/snapshot can exceed that on cold calls (RTSP keyframe wait). So the
  199. detection loop captures locally, stashes the JPEG bytes under a nonce, then hands
  200. Obico a URL that returns those bytes instantly. The cache is single-use + TTLed
  201. so a leaked nonce can't be replayed.
  202. """
  203. def setup_method(self):
  204. _frame_cache.clear()
  205. @pytest.mark.asyncio
  206. async def test_stash_and_pop_roundtrip(self):
  207. nonce = await stash_frame(FAKE_JPEG)
  208. assert nonce # non-empty URL-safe token
  209. data = await pop_frame(nonce)
  210. assert data == FAKE_JPEG
  211. @pytest.mark.asyncio
  212. async def test_nonce_is_single_use(self):
  213. nonce = await stash_frame(FAKE_JPEG)
  214. assert await pop_frame(nonce) == FAKE_JPEG
  215. # Second pop returns None — caches replay protection
  216. assert await pop_frame(nonce) is None
  217. @pytest.mark.asyncio
  218. async def test_unknown_nonce_returns_none(self):
  219. assert await pop_frame("not-a-real-nonce") is None
  220. @pytest.mark.asyncio
  221. async def test_stash_produces_unique_nonces(self):
  222. nonces = {await stash_frame(FAKE_JPEG) for _ in range(10)}
  223. assert len(nonces) == 10
  224. @pytest.mark.asyncio
  225. async def test_expired_entries_are_pruned_on_stash(self):
  226. """New entries trigger pruning of TTL-expired ones — prevents unbounded growth."""
  227. # Manually seed an entry with a stale timestamp
  228. import time as time_module
  229. _frame_cache["stale-nonce"] = (FAKE_JPEG, time_module.monotonic() - FRAME_CACHE_TTL - 1)
  230. await stash_frame(FAKE_JPEG)
  231. # Stale entry was pruned
  232. assert "stale-nonce" not in _frame_cache
  233. @pytest.mark.asyncio
  234. async def test_pop_rejects_expired_nonce(self):
  235. """Even if the entry is still in the dict, an expired TTL returns None."""
  236. import time as time_module
  237. _frame_cache["aging-nonce"] = (FAKE_JPEG, time_module.monotonic() - FRAME_CACHE_TTL - 1)
  238. assert await pop_frame("aging-nonce") is None
  239. class TestCheckPrinterUsesCachedFrameUrl:
  240. """The URL sent to Obico must point at our nonce endpoint, not /camera/snapshot."""
  241. def setup_method(self):
  242. _frame_cache.clear()
  243. @pytest.mark.asyncio
  244. async def test_ml_api_called_with_cached_frame_url(self):
  245. svc = ObicoDetectionService()
  246. settings = {
  247. "enabled": True,
  248. "ml_url": "http://obico:3333",
  249. "sensitivity": "medium",
  250. "action": "notify",
  251. "poll_interval": 10,
  252. "enabled_printers": None,
  253. "external_url": "http://bambuddy:8000",
  254. }
  255. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  256. mock_response = MagicMock()
  257. mock_response.json.return_value = {"detections": []}
  258. mock_response.raise_for_status = MagicMock()
  259. mock_client = MagicMock()
  260. mock_client.get = AsyncMock(return_value=mock_response)
  261. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  262. mock_client.__aexit__ = AsyncMock(return_value=False)
  263. with (
  264. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  265. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  266. ):
  267. await svc._check_printer(1, status, settings)
  268. # Extract the img URL handed to the ML API
  269. _args, kwargs = mock_client.get.call_args
  270. img_url = kwargs["params"]["img"]
  271. assert img_url.startswith("http://bambuddy:8000/api/v1/obico/cached-frame/")
  272. # The path segment after /cached-frame/ is the nonce itself — that nonce must
  273. # resolve back to our stashed frame (single-use guarantees freshness).
  274. nonce = img_url.rsplit("/", 1)[-1]
  275. assert await pop_frame(nonce) == FAKE_JPEG
  276. @pytest.mark.asyncio
  277. async def test_capture_failure_skips_ml_call(self):
  278. """If we can't capture a frame, don't bother the ML API."""
  279. svc = ObicoDetectionService()
  280. settings = {
  281. "enabled": True,
  282. "ml_url": "http://obico:3333",
  283. "sensitivity": "medium",
  284. "action": "notify",
  285. "poll_interval": 10,
  286. "enabled_printers": None,
  287. "external_url": "http://bambuddy:8000",
  288. }
  289. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  290. mock_client = MagicMock()
  291. mock_client.get = AsyncMock()
  292. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  293. mock_client.__aexit__ = AsyncMock(return_value=False)
  294. with (
  295. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  296. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=None)),
  297. ):
  298. await svc._check_printer(1, status, settings)
  299. mock_client.get.assert_not_called()
  300. assert svc._last_error is not None
  301. assert "Failed to capture snapshot" in svc._last_error