obico_detection.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. """Obico AI print-failure detection service.
  2. Polls a self-hosted Obico ML API with snapshots from each monitored printer
  3. while a print is running, smooths scores over time, and dispatches a configured
  4. action (notify / pause / pause_and_off) when a sustained failure is detected.
  5. See `obico_smoothing.py` for the per-print EWM + rolling-mean math.
  6. """
  7. import asyncio
  8. import json
  9. import logging
  10. from collections import deque
  11. from datetime import datetime, timezone
  12. import httpx
  13. from sqlalchemy import select
  14. from backend.app.core.database import async_session
  15. from backend.app.models.printer import Printer
  16. from backend.app.models.settings import Settings
  17. from backend.app.services.obico_smoothing import (
  18. PrintState,
  19. classify,
  20. score_from_detections,
  21. thresholds,
  22. )
  23. logger = logging.getLogger(__name__)
  24. HISTORY_MAX = 50
  25. HEALTH_TIMEOUT = 5.0
  26. DETECTION_TIMEOUT = 30.0
  27. SNAPSHOT_CAPTURE_TIMEOUT = 20 # seconds — we control this, not Obico
  28. class ObicoDetectionService:
  29. """Singleton service that polls the ML API and acts on sustained failures."""
  30. def __init__(self):
  31. self._task: asyncio.Task | None = None
  32. # printer_id -> PrintState (reset when a new print starts)
  33. self._states: dict[int, PrintState] = {}
  34. # printer_id -> task_name active when state was created (used to detect new prints)
  35. self._state_keys: dict[int, str] = {}
  36. # printer_id -> last classification ("safe"/"warning"/"failure")
  37. self._last_class: dict[int, str] = {}
  38. # printer_id -> whether an action has already been fired for the current print
  39. self._action_fired: dict[int, bool] = {}
  40. # Global detection event log (most-recent-first)
  41. self._history: deque = deque(maxlen=HISTORY_MAX)
  42. self._last_error: str | None = None
  43. # ---- lifecycle ----
  44. async def start(self):
  45. if self._task is not None:
  46. return
  47. logger.info("Starting Obico detection service")
  48. self._task = asyncio.create_task(self._loop())
  49. def stop(self):
  50. if self._task:
  51. self._task.cancel()
  52. self._task = None
  53. logger.info("Stopped Obico detection service")
  54. # ---- settings ----
  55. async def _load_settings(self) -> dict:
  56. keys = [
  57. "obico_enabled",
  58. "obico_ml_url",
  59. "obico_sensitivity",
  60. "obico_action",
  61. "obico_poll_interval",
  62. "obico_enabled_printers",
  63. ]
  64. async with async_session() as db:
  65. result = await db.execute(select(Settings).where(Settings.key.in_(keys)))
  66. rows = {r.key: r.value for r in result.scalars().all()}
  67. enabled_printers_raw = rows.get("obico_enabled_printers", "")
  68. if enabled_printers_raw:
  69. try:
  70. enabled_printers = set(json.loads(enabled_printers_raw))
  71. except json.JSONDecodeError:
  72. enabled_printers = set()
  73. else:
  74. enabled_printers = None # None = all printers
  75. return {
  76. "enabled": rows.get("obico_enabled", "false").lower() == "true",
  77. "ml_url": (rows.get("obico_ml_url") or "").rstrip("/"),
  78. "sensitivity": rows.get("obico_sensitivity", "medium"),
  79. "action": rows.get("obico_action", "notify"),
  80. "poll_interval": int(rows.get("obico_poll_interval", "10")),
  81. "enabled_printers": enabled_printers,
  82. }
  83. # ---- main loop ----
  84. async def _loop(self):
  85. """Poll active printers while enabled. Adjusts interval from settings each cycle."""
  86. while True:
  87. try:
  88. settings = await self._load_settings()
  89. interval = max(5, settings.get("poll_interval", 10))
  90. if not settings["enabled"] or not settings["ml_url"]:
  91. await asyncio.sleep(interval)
  92. continue
  93. await self._poll_once(settings)
  94. await asyncio.sleep(interval)
  95. except asyncio.CancelledError:
  96. break
  97. except Exception as e:
  98. logger.error("Obico detection loop error: %s", e)
  99. self._last_error = str(e)
  100. await asyncio.sleep(30)
  101. async def _poll_once(self, settings: dict):
  102. # Late import to avoid cycles at module load time
  103. from backend.app.services.printer_manager import printer_manager
  104. statuses = printer_manager.get_all_statuses()
  105. for printer_id, status in list(statuses.items()):
  106. if settings["enabled_printers"] is not None and printer_id not in settings["enabled_printers"]:
  107. continue
  108. if not printer_manager.is_connected(printer_id):
  109. continue
  110. if not status or getattr(status, "state", None) != "RUNNING":
  111. # Reset state when not printing so the next print starts fresh
  112. self._states.pop(printer_id, None)
  113. self._state_keys.pop(printer_id, None)
  114. self._action_fired.pop(printer_id, None)
  115. continue
  116. await self._check_printer(printer_id, status, settings)
  117. async def _capture_frame(self, printer_id: int) -> bytes | None:
  118. """Capture one JPEG frame from the printer camera. Returns None on failure."""
  119. # Late import to avoid cycles at module load time
  120. from backend.app.services.camera import capture_camera_frame_bytes
  121. from backend.app.services.external_camera import capture_frame as capture_external_frame
  122. async with async_session() as db:
  123. printer = await db.get(Printer, printer_id)
  124. if printer is None:
  125. self._last_error = f"Printer {printer_id} not found"
  126. return None
  127. if printer.external_camera_enabled and printer.external_camera_url:
  128. return await capture_external_frame(
  129. printer.external_camera_url,
  130. printer.external_camera_type,
  131. timeout=SNAPSHOT_CAPTURE_TIMEOUT,
  132. )
  133. return await capture_camera_frame_bytes(
  134. ip_address=printer.ip_address,
  135. access_code=printer.access_code,
  136. model=printer.model,
  137. timeout=SNAPSHOT_CAPTURE_TIMEOUT,
  138. )
  139. async def _check_printer(self, printer_id: int, status, settings: dict):
  140. task_name = getattr(status, "task_name", None) or getattr(status, "subtask_name", "") or ""
  141. key = f"{task_name}"
  142. if self._state_keys.get(printer_id) != key:
  143. self._states[printer_id] = PrintState()
  144. self._state_keys[printer_id] = key
  145. self._action_fired[printer_id] = False
  146. # Capture locally, then POST the JPEG bytes directly to the ML API.
  147. # This avoids the entire class of URL-reachability problems — the ML API
  148. # never needs to call back into Bambuddy, so reverse proxies, external
  149. # auth layers, and Docker networking are all irrelevant.
  150. frame = await self._capture_frame(printer_id)
  151. if not frame:
  152. self._last_error = f"Failed to capture snapshot for printer {printer_id}"
  153. logger.warning(self._last_error)
  154. return
  155. ml_url = f"{settings['ml_url']}/p/"
  156. try:
  157. async with httpx.AsyncClient(timeout=DETECTION_TIMEOUT) as client:
  158. resp = await client.post(
  159. ml_url,
  160. files={"img": ("snapshot.jpg", frame, "image/jpeg")},
  161. )
  162. resp.raise_for_status()
  163. payload = resp.json()
  164. except Exception as e:
  165. self._last_error = f"ML API call failed for printer {printer_id}: {e}"
  166. logger.warning(self._last_error)
  167. return
  168. detections = payload.get("detections", []) if isinstance(payload, dict) else []
  169. current_p = score_from_detections(detections)
  170. state = self._states[printer_id]
  171. score = state.update(current_p)
  172. verdict = classify(score, settings["sensitivity"])
  173. self._last_class[printer_id] = verdict
  174. # Log every non-safe sample — safe samples would flood history
  175. if verdict != "safe" or detections:
  176. self._history.appendleft(
  177. {
  178. "printer_id": printer_id,
  179. "task_name": task_name,
  180. "timestamp": datetime.now(timezone.utc).isoformat(),
  181. "current_p": round(current_p, 4),
  182. "score": round(score, 4),
  183. "class": verdict,
  184. "detections": len(detections),
  185. }
  186. )
  187. if verdict == "failure" and not self._action_fired.get(printer_id):
  188. self._action_fired[printer_id] = True
  189. await self._dispatch_action(printer_id, settings["action"], task_name, score)
  190. async def _dispatch_action(self, printer_id: int, action: str, task_name: str, score: float):
  191. from backend.app.services.obico_actions import execute_action
  192. logger.warning(
  193. "Obico: failure detected on printer %s (task=%r score=%.3f) — action=%s",
  194. printer_id,
  195. task_name,
  196. score,
  197. action,
  198. )
  199. try:
  200. await execute_action(printer_id, action, task_name, score)
  201. except Exception as e:
  202. self._last_error = f"Action dispatch failed: {e}"
  203. logger.error(self._last_error)
  204. # ---- queries ----
  205. def get_status(self) -> dict:
  206. low, high = thresholds("medium")
  207. return {
  208. "is_running": self._task is not None and not self._task.done(),
  209. "last_error": self._last_error,
  210. "per_printer": {
  211. pid: {
  212. "class": self._last_class.get(pid, "safe"),
  213. "frame_count": state.frame_count,
  214. "score": round(state.ewm_mean, 4),
  215. }
  216. for pid, state in self._states.items()
  217. },
  218. "thresholds": {"low": low, "high": high},
  219. "history": list(self._history),
  220. }
  221. async def test_connection(self, url: str) -> dict:
  222. """Ping the ML API health endpoint. Returns {ok, status_code, body, error}."""
  223. target = f"{url.rstrip('/')}/hc/"
  224. try:
  225. async with httpx.AsyncClient(timeout=HEALTH_TIMEOUT) as client:
  226. resp = await client.get(target)
  227. body = resp.text.strip()
  228. return {
  229. "ok": resp.status_code == 200 and body.lower() == "ok",
  230. "status_code": resp.status_code,
  231. "body": body,
  232. "error": None,
  233. }
  234. except Exception as e:
  235. return {"ok": False, "status_code": None, "body": None, "error": str(e)}
  236. obico_detection_service = ObicoDetectionService()