obico_detection.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. """Obico AI print-failure detection service.
  2. Polls a self-hosted Obico ML API with snapshots from each monitored printer
  3. while a print is running, smooths scores over time, and dispatches a configured
  4. action (notify / pause / pause_and_off) when a sustained failure is detected.
  5. See `obico_smoothing.py` for the per-print EWM + rolling-mean math.
  6. """
  7. import asyncio
  8. import json
  9. import logging
  10. from collections import deque
  11. from datetime import datetime, timezone
  12. import httpx
  13. from sqlalchemy import select
  14. from backend.app.core.database import async_session
  15. from backend.app.models.settings import Settings
  16. from backend.app.services.obico_smoothing import (
  17. PrintState,
  18. classify,
  19. score_from_detections,
  20. thresholds,
  21. )
  22. logger = logging.getLogger(__name__)
  23. HISTORY_MAX = 50
  24. HEALTH_TIMEOUT = 5.0
  25. DETECTION_TIMEOUT = 30.0
  26. class ObicoDetectionService:
  27. """Singleton service that polls the ML API and acts on sustained failures."""
  28. def __init__(self):
  29. self._task: asyncio.Task | None = None
  30. # printer_id -> PrintState (reset when a new print starts)
  31. self._states: dict[int, PrintState] = {}
  32. # printer_id -> task_name active when state was created (used to detect new prints)
  33. self._state_keys: dict[int, str] = {}
  34. # printer_id -> last classification ("safe"/"warning"/"failure")
  35. self._last_class: dict[int, str] = {}
  36. # printer_id -> whether an action has already been fired for the current print
  37. self._action_fired: dict[int, bool] = {}
  38. # Global detection event log (most-recent-first)
  39. self._history: deque = deque(maxlen=HISTORY_MAX)
  40. self._last_error: str | None = None
  41. # ---- lifecycle ----
  42. async def start(self):
  43. if self._task is not None:
  44. return
  45. logger.info("Starting Obico detection service")
  46. self._task = asyncio.create_task(self._loop())
  47. def stop(self):
  48. if self._task:
  49. self._task.cancel()
  50. self._task = None
  51. logger.info("Stopped Obico detection service")
  52. # ---- settings ----
  53. async def _load_settings(self) -> dict:
  54. keys = [
  55. "obico_enabled",
  56. "obico_ml_url",
  57. "obico_sensitivity",
  58. "obico_action",
  59. "obico_poll_interval",
  60. "obico_enabled_printers",
  61. "external_url",
  62. ]
  63. async with async_session() as db:
  64. result = await db.execute(select(Settings).where(Settings.key.in_(keys)))
  65. rows = {r.key: r.value for r in result.scalars().all()}
  66. enabled_printers_raw = rows.get("obico_enabled_printers", "")
  67. if enabled_printers_raw:
  68. try:
  69. enabled_printers = set(json.loads(enabled_printers_raw))
  70. except json.JSONDecodeError:
  71. enabled_printers = set()
  72. else:
  73. enabled_printers = None # None = all printers
  74. return {
  75. "enabled": rows.get("obico_enabled", "false").lower() == "true",
  76. "ml_url": (rows.get("obico_ml_url") or "").rstrip("/"),
  77. "sensitivity": rows.get("obico_sensitivity", "medium"),
  78. "action": rows.get("obico_action", "notify"),
  79. "poll_interval": int(rows.get("obico_poll_interval", "10")),
  80. "enabled_printers": enabled_printers,
  81. "external_url": (rows.get("external_url") or "").rstrip("/"),
  82. }
  83. # ---- main loop ----
  84. async def _loop(self):
  85. """Poll active printers while enabled. Adjusts interval from settings each cycle."""
  86. while True:
  87. try:
  88. settings = await self._load_settings()
  89. interval = max(5, settings.get("poll_interval", 10))
  90. if not settings["enabled"] or not settings["ml_url"]:
  91. await asyncio.sleep(interval)
  92. continue
  93. if not settings["external_url"]:
  94. # Without a reachable base URL, the ML API can't fetch snapshots.
  95. self._last_error = "external_url not set — ML API cannot reach snapshot endpoint"
  96. await asyncio.sleep(interval)
  97. continue
  98. await self._poll_once(settings)
  99. await asyncio.sleep(interval)
  100. except asyncio.CancelledError:
  101. break
  102. except Exception as e:
  103. logger.error("Obico detection loop error: %s", e)
  104. self._last_error = str(e)
  105. await asyncio.sleep(30)
  106. async def _poll_once(self, settings: dict):
  107. # Late import to avoid cycles at module load time
  108. from backend.app.services.printer_manager import printer_manager
  109. statuses = printer_manager.get_all_statuses()
  110. for printer_id, status in list(statuses.items()):
  111. if settings["enabled_printers"] is not None and printer_id not in settings["enabled_printers"]:
  112. continue
  113. if not printer_manager.is_connected(printer_id):
  114. continue
  115. if not status or getattr(status, "state", None) != "RUNNING":
  116. # Reset state when not printing so the next print starts fresh
  117. self._states.pop(printer_id, None)
  118. self._state_keys.pop(printer_id, None)
  119. self._action_fired.pop(printer_id, None)
  120. continue
  121. await self._check_printer(printer_id, status, settings)
  122. async def _check_printer(self, printer_id: int, status, settings: dict):
  123. task_name = getattr(status, "task_name", None) or getattr(status, "subtask_name", "") or ""
  124. key = f"{task_name}"
  125. if self._state_keys.get(printer_id) != key:
  126. self._states[printer_id] = PrintState()
  127. self._state_keys[printer_id] = key
  128. self._action_fired[printer_id] = False
  129. snapshot_url = f"{settings['external_url']}/api/v1/printers/{printer_id}/camera/snapshot"
  130. ml_url = f"{settings['ml_url']}/p/"
  131. try:
  132. async with httpx.AsyncClient(timeout=DETECTION_TIMEOUT) as client:
  133. resp = await client.get(ml_url, params={"img": snapshot_url})
  134. resp.raise_for_status()
  135. payload = resp.json()
  136. except Exception as e:
  137. self._last_error = f"ML API call failed for printer {printer_id}: {e}"
  138. logger.warning(self._last_error)
  139. return
  140. detections = payload.get("detections", []) if isinstance(payload, dict) else []
  141. current_p = score_from_detections(detections)
  142. state = self._states[printer_id]
  143. score = state.update(current_p)
  144. verdict = classify(score, settings["sensitivity"])
  145. self._last_class[printer_id] = verdict
  146. # Log every non-safe sample — safe samples would flood history
  147. if verdict != "safe" or detections:
  148. self._history.appendleft(
  149. {
  150. "printer_id": printer_id,
  151. "task_name": task_name,
  152. "timestamp": datetime.now(timezone.utc).isoformat(),
  153. "current_p": round(current_p, 4),
  154. "score": round(score, 4),
  155. "class": verdict,
  156. "detections": len(detections),
  157. }
  158. )
  159. if verdict == "failure" and not self._action_fired.get(printer_id):
  160. self._action_fired[printer_id] = True
  161. await self._dispatch_action(printer_id, settings["action"], task_name, score)
  162. async def _dispatch_action(self, printer_id: int, action: str, task_name: str, score: float):
  163. from backend.app.services.obico_actions import execute_action
  164. logger.warning(
  165. "Obico: failure detected on printer %s (task=%r score=%.3f) — action=%s",
  166. printer_id,
  167. task_name,
  168. score,
  169. action,
  170. )
  171. try:
  172. await execute_action(printer_id, action, task_name, score)
  173. except Exception as e:
  174. self._last_error = f"Action dispatch failed: {e}"
  175. logger.error(self._last_error)
  176. # ---- queries ----
  177. def get_status(self) -> dict:
  178. low, high = thresholds("medium")
  179. return {
  180. "is_running": self._task is not None and not self._task.done(),
  181. "last_error": self._last_error,
  182. "per_printer": {
  183. pid: {
  184. "class": self._last_class.get(pid, "safe"),
  185. "frame_count": state.frame_count,
  186. "score": round(state.ewm_mean, 4),
  187. }
  188. for pid, state in self._states.items()
  189. },
  190. "thresholds": {"low": low, "high": high},
  191. "history": list(self._history),
  192. }
  193. async def test_connection(self, url: str) -> dict:
  194. """Ping the ML API health endpoint. Returns {ok, status_code, body, error}."""
  195. target = f"{url.rstrip('/')}/hc/"
  196. try:
  197. async with httpx.AsyncClient(timeout=HEALTH_TIMEOUT) as client:
  198. resp = await client.get(target)
  199. body = resp.text.strip()
  200. return {
  201. "ok": resp.status_code == 200 and body.lower() == "ok",
  202. "status_code": resp.status_code,
  203. "body": body,
  204. "error": None,
  205. }
  206. except Exception as e:
  207. return {"ok": False, "status_code": None, "body": None, "error": str(e)}
  208. obico_detection_service = ObicoDetectionService()