test_obico_detection.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. """Unit tests for Obico detection service (#172)."""
  2. from unittest.mock import AsyncMock, MagicMock, patch
  3. import pytest
  4. from backend.app.schemas.settings import AppSettingsUpdate
  5. from backend.app.services.obico_detection import (
  6. FRAME_CACHE_TTL,
  7. ObicoDetectionService,
  8. _frame_cache,
  9. pop_frame,
  10. stash_frame,
  11. )
  12. from backend.app.services.obico_smoothing import WARMUP_FRAMES
  13. FAKE_JPEG = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xd9"
  14. class TestSettingsSchemaValidators:
  15. """Guard rails on the new obico_* AppSettings fields."""
  16. def test_sensitivity_accepts_valid_values(self):
  17. for value in ("low", "medium", "high"):
  18. u = AppSettingsUpdate(obico_sensitivity=value)
  19. assert u.obico_sensitivity == value
  20. def test_sensitivity_rejects_garbage(self):
  21. with pytest.raises(ValueError, match="obico_sensitivity"):
  22. AppSettingsUpdate(obico_sensitivity="extreme")
  23. def test_action_accepts_valid_values(self):
  24. for value in ("notify", "pause", "pause_and_off"):
  25. assert AppSettingsUpdate(obico_action=value).obico_action == value
  26. def test_action_rejects_garbage(self):
  27. with pytest.raises(ValueError, match="obico_action"):
  28. AppSettingsUpdate(obico_action="explode")
  29. def test_enabled_printers_accepts_empty(self):
  30. assert AppSettingsUpdate(obico_enabled_printers="").obico_enabled_printers == ""
  31. assert AppSettingsUpdate(obico_enabled_printers=None).obico_enabled_printers is None
  32. def test_enabled_printers_accepts_int_array(self):
  33. u = AppSettingsUpdate(obico_enabled_printers="[1, 2, 3]")
  34. assert u.obico_enabled_printers == "[1, 2, 3]"
  35. def test_enabled_printers_rejects_non_json(self):
  36. with pytest.raises(ValueError, match="valid JSON"):
  37. AppSettingsUpdate(obico_enabled_printers="1,2,3")
  38. def test_enabled_printers_rejects_non_list(self):
  39. with pytest.raises(ValueError, match="JSON array"):
  40. AppSettingsUpdate(obico_enabled_printers='{"1": true}')
  41. def test_enabled_printers_rejects_non_int_elements(self):
  42. with pytest.raises(ValueError, match="JSON array"):
  43. AppSettingsUpdate(obico_enabled_printers='[1, "two"]')
  44. def test_poll_interval_bounds(self):
  45. with pytest.raises(ValueError):
  46. AppSettingsUpdate(obico_poll_interval=4)
  47. with pytest.raises(ValueError):
  48. AppSettingsUpdate(obico_poll_interval=121)
  49. assert AppSettingsUpdate(obico_poll_interval=10).obico_poll_interval == 10
  50. class TestGetStatus:
  51. def test_empty_initial_status(self):
  52. svc = ObicoDetectionService()
  53. s = svc.get_status()
  54. assert s["is_running"] is False
  55. assert s["per_printer"] == {}
  56. assert s["history"] == []
  57. assert "low" in s["thresholds"] and "high" in s["thresholds"]
  58. class TestTestConnection:
  59. @pytest.mark.asyncio
  60. async def test_empty_url_via_route(self):
  61. """Service does not special-case empty URL — the route does."""
  62. svc = ObicoDetectionService()
  63. # This will fail DNS/connect, but should return ok=False
  64. result = await svc.test_connection("http://nonexistent-obico-host-xyz.invalid:3333")
  65. assert result["ok"] is False
  66. assert result["error"] is not None
  67. @pytest.mark.asyncio
  68. async def test_healthy_response_is_ok(self):
  69. svc = ObicoDetectionService()
  70. mock_response = MagicMock(status_code=200, text="ok")
  71. mock_client = MagicMock()
  72. mock_client.get = AsyncMock(return_value=mock_response)
  73. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  74. mock_client.__aexit__ = AsyncMock(return_value=False)
  75. with patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client):
  76. result = await svc.test_connection("http://obico:3333")
  77. assert result["ok"] is True
  78. assert result["status_code"] == 200
  79. assert result["body"] == "ok"
  80. assert result["error"] is None
  81. @pytest.mark.asyncio
  82. async def test_non_ok_body_is_not_ok(self):
  83. svc = ObicoDetectionService()
  84. mock_response = MagicMock(status_code=200, text="something else")
  85. mock_client = MagicMock()
  86. mock_client.get = AsyncMock(return_value=mock_response)
  87. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  88. mock_client.__aexit__ = AsyncMock(return_value=False)
  89. with patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client):
  90. result = await svc.test_connection("http://obico:3333/")
  91. assert result["ok"] is False
  92. assert result["body"] == "something else"
  93. class TestPollOneStateLifecycle:
  94. """Confirms per-printer state is reset when a new print starts."""
  95. @pytest.mark.asyncio
  96. async def test_new_task_name_resets_state(self):
  97. svc = ObicoDetectionService()
  98. # Seed a state that has been running for a while
  99. from backend.app.services.obico_smoothing import PrintState
  100. seeded = PrintState()
  101. for _ in range(WARMUP_FRAMES + 5):
  102. seeded.update(0.5)
  103. svc._states[1] = seeded
  104. svc._state_keys[1] = "old_task"
  105. svc._action_fired[1] = True
  106. settings = {
  107. "enabled": True,
  108. "ml_url": "http://obico:3333",
  109. "sensitivity": "medium",
  110. "action": "notify",
  111. "poll_interval": 10,
  112. "enabled_printers": None,
  113. "external_url": "http://bambuddy:8000",
  114. }
  115. status = MagicMock(state="RUNNING", task_name="new_task", subtask_name="")
  116. mock_response = MagicMock()
  117. mock_response.json.return_value = {"detections": []}
  118. mock_response.raise_for_status = MagicMock()
  119. mock_client = MagicMock()
  120. mock_client.get = AsyncMock(return_value=mock_response)
  121. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  122. mock_client.__aexit__ = AsyncMock(return_value=False)
  123. with (
  124. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  125. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  126. ):
  127. await svc._check_printer(1, status, settings)
  128. # State was reset (frame_count is 1 after the single update, not 36)
  129. assert svc._states[1].frame_count == 1
  130. assert svc._state_keys[1] == "new_task"
  131. assert svc._action_fired[1] is False
  132. @pytest.mark.asyncio
  133. async def test_ml_api_error_does_not_crash(self):
  134. svc = ObicoDetectionService()
  135. settings = {
  136. "enabled": True,
  137. "ml_url": "http://obico:3333",
  138. "sensitivity": "medium",
  139. "action": "notify",
  140. "poll_interval": 10,
  141. "enabled_printers": None,
  142. "external_url": "http://bambuddy:8000",
  143. }
  144. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  145. mock_client = MagicMock()
  146. mock_client.get = AsyncMock(side_effect=RuntimeError("connection refused"))
  147. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  148. mock_client.__aexit__ = AsyncMock(return_value=False)
  149. with (
  150. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  151. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  152. ):
  153. await svc._check_printer(1, status, settings)
  154. assert svc._last_error is not None
  155. assert "connection refused" in svc._last_error
  156. @pytest.mark.asyncio
  157. async def test_ml_api_empty_exception_message_falls_back_to_type(self):
  158. """If str(exc) is empty, log the exception class name instead of a blank suffix."""
  159. svc = ObicoDetectionService()
  160. settings = {
  161. "enabled": True,
  162. "ml_url": "http://obico:3333",
  163. "sensitivity": "medium",
  164. "action": "notify",
  165. "poll_interval": 10,
  166. "enabled_printers": None,
  167. "external_url": "http://bambuddy:8000",
  168. }
  169. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  170. class _SilentError(Exception):
  171. def __str__(self) -> str:
  172. return ""
  173. mock_client = MagicMock()
  174. mock_client.get = AsyncMock(side_effect=_SilentError())
  175. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  176. mock_client.__aexit__ = AsyncMock(return_value=False)
  177. with (
  178. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  179. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  180. ):
  181. await svc._check_printer(1, status, settings)
  182. assert svc._last_error is not None
  183. assert "_SilentError" in svc._last_error
  184. # The suffix is never blank
  185. assert not svc._last_error.rstrip().endswith(":")
  186. @pytest.mark.asyncio
  187. async def test_failure_fires_action_only_once(self):
  188. """Once a failure has fired for a print, subsequent failures should not re-fire."""
  189. svc = ObicoDetectionService()
  190. settings = {
  191. "enabled": True,
  192. "ml_url": "http://obico:3333",
  193. "sensitivity": "medium",
  194. "action": "notify",
  195. "poll_interval": 10,
  196. "enabled_printers": None,
  197. "external_url": "http://bambuddy:8000",
  198. }
  199. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  200. # Seed state so the next frame crosses HIGH immediately
  201. from backend.app.services.obico_smoothing import PrintState
  202. seeded = PrintState()
  203. for _ in range(WARMUP_FRAMES + 500):
  204. seeded.update(1.0)
  205. svc._states[1] = seeded
  206. svc._state_keys[1] = "job"
  207. svc._action_fired[1] = False
  208. mock_response = MagicMock()
  209. mock_response.json.return_value = {"detections": [["failure", 0.9, [0, 0, 1, 1]]]}
  210. mock_response.raise_for_status = MagicMock()
  211. mock_client = MagicMock()
  212. mock_client.get = AsyncMock(return_value=mock_response)
  213. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  214. mock_client.__aexit__ = AsyncMock(return_value=False)
  215. with (
  216. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  217. patch("backend.app.services.obico_actions.execute_action", new=AsyncMock()) as mock_action,
  218. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  219. ):
  220. await svc._check_printer(1, status, settings)
  221. assert mock_action.call_count == 1
  222. await svc._check_printer(1, status, settings)
  223. # Second call must not dispatch again
  224. assert mock_action.call_count == 1
  225. class TestFrameCache:
  226. """One-shot JPEG cache that lets us sidestep Obico's 5s read timeout.
  227. Obico's ML API fetches snapshots via `GET /p/?img=URL` with `timeout=(0.1, 5)`.
  228. Our /camera/snapshot can exceed that on cold calls (RTSP keyframe wait). So the
  229. detection loop captures locally, stashes the JPEG bytes under a nonce, then hands
  230. Obico a URL that returns those bytes instantly. The cache is single-use + TTLed
  231. so a leaked nonce can't be replayed.
  232. """
  233. def setup_method(self):
  234. _frame_cache.clear()
  235. @pytest.mark.asyncio
  236. async def test_stash_and_pop_roundtrip(self):
  237. nonce = await stash_frame(FAKE_JPEG)
  238. assert nonce # non-empty URL-safe token
  239. data = await pop_frame(nonce)
  240. assert data == FAKE_JPEG
  241. @pytest.mark.asyncio
  242. async def test_nonce_is_single_use(self):
  243. nonce = await stash_frame(FAKE_JPEG)
  244. assert await pop_frame(nonce) == FAKE_JPEG
  245. # Second pop returns None — caches replay protection
  246. assert await pop_frame(nonce) is None
  247. @pytest.mark.asyncio
  248. async def test_unknown_nonce_returns_none(self):
  249. assert await pop_frame("not-a-real-nonce") is None
  250. @pytest.mark.asyncio
  251. async def test_stash_produces_unique_nonces(self):
  252. nonces = {await stash_frame(FAKE_JPEG) for _ in range(10)}
  253. assert len(nonces) == 10
  254. @pytest.mark.asyncio
  255. async def test_expired_entries_are_pruned_on_stash(self):
  256. """New entries trigger pruning of TTL-expired ones — prevents unbounded growth."""
  257. # Manually seed an entry with a stale timestamp
  258. import time as time_module
  259. _frame_cache["stale-nonce"] = (FAKE_JPEG, time_module.monotonic() - FRAME_CACHE_TTL - 1)
  260. await stash_frame(FAKE_JPEG)
  261. # Stale entry was pruned
  262. assert "stale-nonce" not in _frame_cache
  263. @pytest.mark.asyncio
  264. async def test_pop_rejects_expired_nonce(self):
  265. """Even if the entry is still in the dict, an expired TTL returns None."""
  266. import time as time_module
  267. _frame_cache["aging-nonce"] = (FAKE_JPEG, time_module.monotonic() - FRAME_CACHE_TTL - 1)
  268. assert await pop_frame("aging-nonce") is None
  269. class TestCheckPrinterUsesCachedFrameUrl:
  270. """The URL sent to Obico must point at our nonce endpoint, not /camera/snapshot."""
  271. def setup_method(self):
  272. _frame_cache.clear()
  273. @pytest.mark.asyncio
  274. async def test_ml_api_called_with_cached_frame_url(self):
  275. svc = ObicoDetectionService()
  276. settings = {
  277. "enabled": True,
  278. "ml_url": "http://obico:3333",
  279. "sensitivity": "medium",
  280. "action": "notify",
  281. "poll_interval": 10,
  282. "enabled_printers": None,
  283. "external_url": "http://bambuddy:8000",
  284. }
  285. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  286. mock_response = MagicMock()
  287. mock_response.json.return_value = {"detections": []}
  288. mock_response.raise_for_status = MagicMock()
  289. mock_client = MagicMock()
  290. mock_client.get = AsyncMock(return_value=mock_response)
  291. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  292. mock_client.__aexit__ = AsyncMock(return_value=False)
  293. with (
  294. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  295. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  296. ):
  297. await svc._check_printer(1, status, settings)
  298. # ML API was called via GET (Obico's /p/ is GET-only)
  299. mock_client.get.assert_called_once()
  300. _args, kwargs = mock_client.get.call_args
  301. assert _args[0] == "http://obico:3333/p/"
  302. img_url = kwargs["params"]["img"]
  303. assert img_url.startswith("http://bambuddy:8000/api/v1/obico/cached-frame/")
  304. # The path segment after /cached-frame/ is the nonce itself — that nonce must
  305. # resolve back to our stashed frame (single-use guarantees freshness).
  306. nonce = img_url.rsplit("/", 1)[-1]
  307. assert await pop_frame(nonce) == FAKE_JPEG
  308. @pytest.mark.asyncio
  309. async def test_capture_failure_skips_ml_call(self):
  310. """If we can't capture a frame, don't bother the ML API."""
  311. svc = ObicoDetectionService()
  312. settings = {
  313. "enabled": True,
  314. "ml_url": "http://obico:3333",
  315. "sensitivity": "medium",
  316. "action": "notify",
  317. "poll_interval": 10,
  318. "enabled_printers": None,
  319. "external_url": "http://bambuddy:8000",
  320. }
  321. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  322. mock_client = MagicMock()
  323. mock_client.get = AsyncMock()
  324. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  325. mock_client.__aexit__ = AsyncMock(return_value=False)
  326. with (
  327. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  328. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=None)),
  329. ):
  330. await svc._check_printer(1, status, settings)
  331. mock_client.get.assert_not_called()
  332. assert svc._last_error is not None
  333. assert "Failed to capture snapshot" in svc._last_error
  334. @pytest.mark.asyncio
  335. async def test_missing_external_url_skips_ml_call(self):
  336. """Without external_url, Obico can't reach our cached-frame endpoint."""
  337. svc = ObicoDetectionService()
  338. settings = {
  339. "enabled": True,
  340. "ml_url": "http://obico:3333",
  341. "sensitivity": "medium",
  342. "action": "notify",
  343. "poll_interval": 10,
  344. "enabled_printers": None,
  345. "external_url": "",
  346. }
  347. status = MagicMock(state="RUNNING", task_name="job", subtask_name="")
  348. mock_client = MagicMock()
  349. mock_client.get = AsyncMock()
  350. mock_client.__aenter__ = AsyncMock(return_value=mock_client)
  351. mock_client.__aexit__ = AsyncMock(return_value=False)
  352. with (
  353. patch("backend.app.services.obico_detection.httpx.AsyncClient", return_value=mock_client),
  354. patch.object(svc, "_capture_frame", new=AsyncMock(return_value=FAKE_JPEG)),
  355. ):
  356. await svc._check_printer(1, status, settings)
  357. mock_client.get.assert_not_called()
  358. assert svc._last_error is not None
  359. assert "external_url" in svc._last_error