obico_detection.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. """Obico AI print-failure detection service.
  2. Polls a self-hosted Obico ML API with snapshots from each monitored printer
  3. while a print is running, smooths scores over time, and dispatches a configured
  4. action (notify / pause / pause_and_off) when a sustained failure is detected.
  5. See `obico_smoothing.py` for the per-print EWM + rolling-mean math.
  6. """
  7. import asyncio
  8. import json
  9. import logging
  10. import secrets
  11. import time
  12. from collections import deque
  13. from datetime import datetime, timezone
  14. import httpx
  15. from sqlalchemy import select
  16. from backend.app.core.database import async_session
  17. from backend.app.models.printer import Printer
  18. from backend.app.models.settings import Settings
  19. from backend.app.services.obico_smoothing import (
  20. PrintState,
  21. classify,
  22. score_from_detections,
  23. thresholds,
  24. )
  25. logger = logging.getLogger(__name__)
  26. HISTORY_MAX = 50
  27. HEALTH_TIMEOUT = 5.0
  28. DETECTION_TIMEOUT = 30.0
  29. SNAPSHOT_CAPTURE_TIMEOUT = 20 # seconds — we control this, not Obico
  30. FRAME_CACHE_TTL = 30.0 # seconds — Obico usually fetches within 1s of receiving the URL
  31. # Module-level one-shot frame cache. Obico's ML API is GET-only (/p/?img=URL) and
  32. # fetches the URL itself with a hardcoded 5s read timeout. We capture locally first,
  33. # stash the JPEG under a random nonce, and hand Obico a URL that serves the cached
  34. # bytes instantly — so the 5s ceiling never races RTSP keyframe wait.
  35. _frame_cache: dict[str, tuple[bytes, float]] = {}
  36. _frame_cache_lock = asyncio.Lock()
  37. def _prune_frame_cache() -> None:
  38. """Drop entries older than FRAME_CACHE_TTL. Called under the cache lock."""
  39. now = time.monotonic()
  40. expired = [k for k, (_b, ts) in _frame_cache.items() if now - ts > FRAME_CACHE_TTL]
  41. for k in expired:
  42. _frame_cache.pop(k, None)
  43. async def stash_frame(data: bytes) -> str:
  44. """Store JPEG bytes and return a URL-safe nonce that serves them once."""
  45. nonce = secrets.token_urlsafe(32)
  46. async with _frame_cache_lock:
  47. _prune_frame_cache()
  48. _frame_cache[nonce] = (data, time.monotonic())
  49. return nonce
  50. async def pop_frame(nonce: str) -> bytes | None:
  51. """Return and remove a cached frame by nonce; None if missing or expired."""
  52. async with _frame_cache_lock:
  53. _prune_frame_cache()
  54. entry = _frame_cache.pop(nonce, None)
  55. if entry is None:
  56. return None
  57. data, ts = entry
  58. if time.monotonic() - ts > FRAME_CACHE_TTL:
  59. return None
  60. return data
  61. class ObicoDetectionService:
  62. """Singleton service that polls the ML API and acts on sustained failures."""
  63. def __init__(self):
  64. self._task: asyncio.Task | None = None
  65. # printer_id -> PrintState (reset when a new print starts)
  66. self._states: dict[int, PrintState] = {}
  67. # printer_id -> task_name active when state was created (used to detect new prints)
  68. self._state_keys: dict[int, str] = {}
  69. # printer_id -> last classification ("safe"/"warning"/"failure")
  70. self._last_class: dict[int, str] = {}
  71. # printer_id -> whether an action has already been fired for the current print
  72. self._action_fired: dict[int, bool] = {}
  73. # Global detection event log (most-recent-first)
  74. self._history: deque = deque(maxlen=HISTORY_MAX)
  75. self._last_error: str | None = None
  76. # ---- lifecycle ----
  77. async def start(self):
  78. if self._task is not None:
  79. return
  80. logger.info("Starting Obico detection service")
  81. self._task = asyncio.create_task(self._loop())
  82. def stop(self):
  83. if self._task:
  84. self._task.cancel()
  85. self._task = None
  86. logger.info("Stopped Obico detection service")
  87. # ---- settings ----
  88. async def _load_settings(self) -> dict:
  89. keys = [
  90. "obico_enabled",
  91. "obico_ml_url",
  92. "obico_sensitivity",
  93. "obico_action",
  94. "obico_poll_interval",
  95. "obico_enabled_printers",
  96. "external_url",
  97. ]
  98. async with async_session() as db:
  99. result = await db.execute(select(Settings).where(Settings.key.in_(keys)))
  100. rows = {r.key: r.value for r in result.scalars().all()}
  101. enabled_printers_raw = rows.get("obico_enabled_printers", "")
  102. if enabled_printers_raw:
  103. try:
  104. enabled_printers = set(json.loads(enabled_printers_raw))
  105. except json.JSONDecodeError:
  106. enabled_printers = set()
  107. else:
  108. enabled_printers = None # None = all printers
  109. return {
  110. "enabled": rows.get("obico_enabled", "false").lower() == "true",
  111. "ml_url": (rows.get("obico_ml_url") or "").rstrip("/"),
  112. "sensitivity": rows.get("obico_sensitivity", "medium"),
  113. "action": rows.get("obico_action", "notify"),
  114. "poll_interval": int(rows.get("obico_poll_interval", "10")),
  115. "enabled_printers": enabled_printers,
  116. "external_url": (rows.get("external_url") or "").rstrip("/"),
  117. }
  118. # ---- main loop ----
  119. async def _loop(self):
  120. """Poll active printers while enabled. Adjusts interval from settings each cycle."""
  121. while True:
  122. try:
  123. settings = await self._load_settings()
  124. interval = max(5, settings.get("poll_interval", 10))
  125. if not settings["enabled"] or not settings["ml_url"]:
  126. await asyncio.sleep(interval)
  127. continue
  128. await self._poll_once(settings)
  129. await asyncio.sleep(interval)
  130. except asyncio.CancelledError:
  131. break
  132. except Exception as e:
  133. logger.error("Obico detection loop error: %s", e)
  134. self._last_error = str(e) or type(e).__name__
  135. await asyncio.sleep(30)
  136. async def _poll_once(self, settings: dict):
  137. # Late import to avoid cycles at module load time
  138. from backend.app.services.printer_manager import printer_manager
  139. statuses = printer_manager.get_all_statuses()
  140. for printer_id, status in list(statuses.items()):
  141. if settings["enabled_printers"] is not None and printer_id not in settings["enabled_printers"]:
  142. continue
  143. if not printer_manager.is_connected(printer_id):
  144. continue
  145. if not status or getattr(status, "state", None) != "RUNNING":
  146. # Reset state when not printing so the next print starts fresh
  147. self._states.pop(printer_id, None)
  148. self._state_keys.pop(printer_id, None)
  149. self._action_fired.pop(printer_id, None)
  150. continue
  151. await self._check_printer(printer_id, status, settings)
  152. async def _capture_frame(self, printer_id: int) -> bytes | None:
  153. """Capture one JPEG frame from the printer camera. Returns None on failure."""
  154. # Late import to avoid cycles at module load time
  155. from backend.app.services.camera import capture_camera_frame_bytes
  156. from backend.app.services.external_camera import capture_frame as capture_external_frame
  157. async with async_session() as db:
  158. printer = await db.get(Printer, printer_id)
  159. if printer is None:
  160. self._last_error = f"Printer {printer_id} not found"
  161. return None
  162. if printer.external_camera_enabled and printer.external_camera_url:
  163. return await capture_external_frame(
  164. printer.external_camera_url,
  165. printer.external_camera_type,
  166. timeout=SNAPSHOT_CAPTURE_TIMEOUT,
  167. snapshot_url=printer.external_camera_snapshot_url,
  168. )
  169. # Reuse the fan-out broadcaster's buffered frame when a viewer is
  170. # already watching — avoids opening a second concurrent RTSP socket
  171. # on printers that allow only one camera connection (e.g. X2D
  172. # firmware 01.01.00.00; see #1271). Buffered frame is <1s old while
  173. # a viewer is connected.
  174. #
  175. # When a viewer is attached but no frame is buffered yet (startup
  176. # race, mid-reconnect), we DELIBERATELY skip this poll cycle instead
  177. # of falling through to capture_camera_frame_bytes. Opening a fresh
  178. # RTSP/chamber socket would compete with the live viewer and kick
  179. # the fan-out connection on most firmwares — exactly the freeze
  180. # reported in #1348. The poll loop retries in ~10s.
  181. from backend.app.api.routes.camera import is_stream_active, try_get_active_buffered_frame
  182. if is_stream_active(printer_id):
  183. buffered = try_get_active_buffered_frame(printer_id)
  184. if buffered:
  185. return buffered
  186. logger.info(
  187. "Obico: viewer attached for printer %s but buffer empty; skipping this poll to avoid competing camera socket (#1348)",
  188. printer_id,
  189. )
  190. return None
  191. return await capture_camera_frame_bytes(
  192. ip_address=printer.ip_address,
  193. access_code=printer.access_code,
  194. model=printer.model,
  195. timeout=SNAPSHOT_CAPTURE_TIMEOUT,
  196. )
  197. async def _check_printer(self, printer_id: int, status, settings: dict):
  198. task_name = getattr(status, "task_name", None) or getattr(status, "subtask_name", "") or ""
  199. key = f"{task_name}"
  200. if self._state_keys.get(printer_id) != key:
  201. self._states[printer_id] = PrintState()
  202. self._state_keys[printer_id] = key
  203. self._action_fired[printer_id] = False
  204. # Capture locally first, then hand Obico a nonce URL that returns the
  205. # cached bytes instantly. Obico's ML API is GET-only (/p/?img=URL) with a
  206. # hardcoded 5s read timeout which would otherwise race our /camera/snapshot
  207. # keyframe wait.
  208. frame = await self._capture_frame(printer_id)
  209. if not frame:
  210. self._last_error = f"Failed to capture snapshot for printer {printer_id}"
  211. logger.warning(self._last_error)
  212. return
  213. external_url = settings.get("external_url") or ""
  214. if not external_url:
  215. self._last_error = (
  216. "external_url setting is empty — Obico's ML API needs a reachable URL to fetch the snapshot from. "
  217. "Set Settings → General → External URL."
  218. )
  219. logger.warning(self._last_error)
  220. return
  221. nonce = await stash_frame(frame)
  222. snapshot_url = f"{external_url}/api/v1/obico/cached-frame/{nonce}"
  223. ml_url = f"{settings['ml_url']}/p/"
  224. try:
  225. async with httpx.AsyncClient(timeout=DETECTION_TIMEOUT) as client:
  226. resp = await client.get(ml_url, params={"img": snapshot_url})
  227. resp.raise_for_status()
  228. payload = resp.json()
  229. except Exception as e:
  230. detail = str(e) or type(e).__name__
  231. self._last_error = f"ML API call failed for printer {printer_id}: {detail}"
  232. logger.warning(self._last_error)
  233. return
  234. detections = payload.get("detections", []) if isinstance(payload, dict) else []
  235. current_p = score_from_detections(detections)
  236. state = self._states[printer_id]
  237. score = state.update(current_p)
  238. verdict = classify(score, settings["sensitivity"])
  239. self._last_class[printer_id] = verdict
  240. # A successful capture + ML call clears any transient error from previous
  241. # polls (typical case: cold-start RTSP timeout on first frame after startup,
  242. # followed by healthy polls that otherwise leave the banner stuck in the UI).
  243. self._last_error = None
  244. # Log every non-safe sample — safe samples would flood history
  245. if verdict != "safe" or detections:
  246. self._history.appendleft(
  247. {
  248. "printer_id": printer_id,
  249. "task_name": task_name,
  250. "timestamp": datetime.now(timezone.utc).isoformat(),
  251. "current_p": round(current_p, 4),
  252. "score": round(score, 4),
  253. "class": verdict,
  254. "detections": len(detections),
  255. }
  256. )
  257. if verdict == "failure" and not self._action_fired.get(printer_id):
  258. self._action_fired[printer_id] = True
  259. await self._dispatch_action(printer_id, settings["action"], task_name, score)
  260. async def _dispatch_action(self, printer_id: int, action: str, task_name: str, score: float):
  261. from backend.app.services.obico_actions import execute_action
  262. logger.warning(
  263. "Obico: failure detected on printer %s (task=%r score=%.3f) — action=%s",
  264. printer_id,
  265. task_name,
  266. score,
  267. action,
  268. )
  269. try:
  270. await execute_action(printer_id, action, task_name, score)
  271. except Exception as e:
  272. self._last_error = f"Action dispatch failed: {e or type(e).__name__}"
  273. logger.error(self._last_error)
  274. # ---- queries ----
  275. def get_status(self, sensitivity: str = "medium") -> dict:
  276. # Report the thresholds for the configured sensitivity, not a hardcoded
  277. # "medium" — otherwise the Status panel always shows the medium row
  278. # regardless of the user's selection (#1469). thresholds() falls back
  279. # to the medium multiplier for any unrecognized value.
  280. low, high = thresholds(sensitivity)
  281. return {
  282. "is_running": self._task is not None and not self._task.done(),
  283. "last_error": self._last_error,
  284. "per_printer": {
  285. pid: {
  286. "class": self._last_class.get(pid, "safe"),
  287. "frame_count": state.frame_count,
  288. "score": round(state.ewm_mean, 4),
  289. }
  290. for pid, state in self._states.items()
  291. },
  292. "thresholds": {"low": low, "high": high},
  293. "history": list(self._history),
  294. }
  295. async def test_connection(self, url: str) -> dict:
  296. """Ping the ML API health endpoint. Returns {ok, status_code, body, error}."""
  297. target = f"{url.rstrip('/')}/hc/"
  298. try:
  299. async with httpx.AsyncClient(timeout=HEALTH_TIMEOUT) as client:
  300. resp = await client.get(target)
  301. body = resp.text.strip()
  302. return {
  303. "ok": resp.status_code == 200 and body.lower() == "ok",
  304. "status_code": resp.status_code,
  305. "body": body,
  306. "error": None,
  307. }
  308. except Exception as e:
  309. return {"ok": False, "status_code": None, "body": None, "error": str(e) or type(e).__name__}
  310. obico_detection_service = ObicoDetectionService()