| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- """Obico AI print-failure detection service.
- Polls a self-hosted Obico ML API with snapshots from each monitored printer
- while a print is running, smooths scores over time, and dispatches a configured
- action (notify / pause / pause_and_off) when a sustained failure is detected.
- See `obico_smoothing.py` for the per-print EWM + rolling-mean math.
- """
- import asyncio
- import json
- import logging
- import secrets
- import time
- from collections import deque
- from datetime import datetime, timezone
- import httpx
- from sqlalchemy import select
- from backend.app.core.database import async_session
- from backend.app.models.printer import Printer
- from backend.app.models.settings import Settings
- from backend.app.services.obico_smoothing import (
- PrintState,
- classify,
- score_from_detections,
- thresholds,
- )
- logger = logging.getLogger(__name__)
- HISTORY_MAX = 50
- HEALTH_TIMEOUT = 5.0
- DETECTION_TIMEOUT = 30.0
- SNAPSHOT_CAPTURE_TIMEOUT = 20 # seconds — we control this, not Obico
- FRAME_CACHE_TTL = 30.0 # seconds — Obico usually fetches within 1s of receiving the URL
- # Module-level one-shot frame cache. Obico's ML API is GET-only (/p/?img=URL) and
- # fetches the URL itself with a hardcoded 5s read timeout. We capture locally first,
- # stash the JPEG under a random nonce, and hand Obico a URL that serves the cached
- # bytes instantly — so the 5s ceiling never races RTSP keyframe wait.
- _frame_cache: dict[str, tuple[bytes, float]] = {}
- _frame_cache_lock = asyncio.Lock()
- def _prune_frame_cache() -> None:
- """Drop entries older than FRAME_CACHE_TTL. Called under the cache lock."""
- now = time.monotonic()
- expired = [k for k, (_b, ts) in _frame_cache.items() if now - ts > FRAME_CACHE_TTL]
- for k in expired:
- _frame_cache.pop(k, None)
- async def stash_frame(data: bytes) -> str:
- """Store JPEG bytes and return a URL-safe nonce that serves them once."""
- nonce = secrets.token_urlsafe(32)
- async with _frame_cache_lock:
- _prune_frame_cache()
- _frame_cache[nonce] = (data, time.monotonic())
- return nonce
- async def pop_frame(nonce: str) -> bytes | None:
- """Return and remove a cached frame by nonce; None if missing or expired."""
- async with _frame_cache_lock:
- _prune_frame_cache()
- entry = _frame_cache.pop(nonce, None)
- if entry is None:
- return None
- data, ts = entry
- if time.monotonic() - ts > FRAME_CACHE_TTL:
- return None
- return data
- class ObicoDetectionService:
- """Singleton service that polls the ML API and acts on sustained failures."""
- def __init__(self):
- self._task: asyncio.Task | None = None
- # printer_id -> PrintState (reset when a new print starts)
- self._states: dict[int, PrintState] = {}
- # printer_id -> task_name active when state was created (used to detect new prints)
- self._state_keys: dict[int, str] = {}
- # printer_id -> last classification ("safe"/"warning"/"failure")
- self._last_class: dict[int, str] = {}
- # printer_id -> whether an action has already been fired for the current print
- self._action_fired: dict[int, bool] = {}
- # Global detection event log (most-recent-first)
- self._history: deque = deque(maxlen=HISTORY_MAX)
- self._last_error: str | None = None
- # ---- lifecycle ----
- async def start(self):
- if self._task is not None:
- return
- logger.info("Starting Obico detection service")
- self._task = asyncio.create_task(self._loop())
- def stop(self):
- if self._task:
- self._task.cancel()
- self._task = None
- logger.info("Stopped Obico detection service")
- # ---- settings ----
- async def _load_settings(self) -> dict:
- keys = [
- "obico_enabled",
- "obico_ml_url",
- "obico_sensitivity",
- "obico_action",
- "obico_poll_interval",
- "obico_enabled_printers",
- "external_url",
- ]
- async with async_session() as db:
- result = await db.execute(select(Settings).where(Settings.key.in_(keys)))
- rows = {r.key: r.value for r in result.scalars().all()}
- enabled_printers_raw = rows.get("obico_enabled_printers", "")
- if enabled_printers_raw:
- try:
- enabled_printers = set(json.loads(enabled_printers_raw))
- except json.JSONDecodeError:
- enabled_printers = set()
- else:
- enabled_printers = None # None = all printers
- return {
- "enabled": rows.get("obico_enabled", "false").lower() == "true",
- "ml_url": (rows.get("obico_ml_url") or "").rstrip("/"),
- "sensitivity": rows.get("obico_sensitivity", "medium"),
- "action": rows.get("obico_action", "notify"),
- "poll_interval": int(rows.get("obico_poll_interval", "10")),
- "enabled_printers": enabled_printers,
- "external_url": (rows.get("external_url") or "").rstrip("/"),
- }
- # ---- main loop ----
- async def _loop(self):
- """Poll active printers while enabled. Adjusts interval from settings each cycle."""
- while True:
- try:
- settings = await self._load_settings()
- interval = max(5, settings.get("poll_interval", 10))
- if not settings["enabled"] or not settings["ml_url"]:
- await asyncio.sleep(interval)
- continue
- await self._poll_once(settings)
- await asyncio.sleep(interval)
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error("Obico detection loop error: %s", e)
- self._last_error = str(e) or type(e).__name__
- await asyncio.sleep(30)
- async def _poll_once(self, settings: dict):
- # Late import to avoid cycles at module load time
- from backend.app.services.printer_manager import printer_manager
- statuses = printer_manager.get_all_statuses()
- for printer_id, status in list(statuses.items()):
- if settings["enabled_printers"] is not None and printer_id not in settings["enabled_printers"]:
- continue
- if not printer_manager.is_connected(printer_id):
- continue
- if not status or getattr(status, "state", None) != "RUNNING":
- # Reset state when not printing so the next print starts fresh
- self._states.pop(printer_id, None)
- self._state_keys.pop(printer_id, None)
- self._action_fired.pop(printer_id, None)
- continue
- await self._check_printer(printer_id, status, settings)
- async def _capture_frame(self, printer_id: int) -> bytes | None:
- """Capture one JPEG frame from the printer camera. Returns None on failure."""
- # Late import to avoid cycles at module load time
- from backend.app.services.camera import capture_camera_frame_bytes
- from backend.app.services.external_camera import capture_frame as capture_external_frame
- async with async_session() as db:
- printer = await db.get(Printer, printer_id)
- if printer is None:
- self._last_error = f"Printer {printer_id} not found"
- return None
- if printer.external_camera_enabled and printer.external_camera_url:
- return await capture_external_frame(
- printer.external_camera_url,
- printer.external_camera_type,
- timeout=SNAPSHOT_CAPTURE_TIMEOUT,
- snapshot_url=printer.external_camera_snapshot_url,
- )
- # Reuse the fan-out broadcaster's buffered frame when a viewer is
- # already watching — avoids opening a second concurrent RTSP socket
- # on printers that allow only one camera connection (e.g. X2D
- # firmware 01.01.00.00; see #1271). Buffered frame is <1s old while
- # a viewer is connected.
- #
- # When a viewer is attached but no frame is buffered yet (startup
- # race, mid-reconnect), we DELIBERATELY skip this poll cycle instead
- # of falling through to capture_camera_frame_bytes. Opening a fresh
- # RTSP/chamber socket would compete with the live viewer and kick
- # the fan-out connection on most firmwares — exactly the freeze
- # reported in #1348. The poll loop retries in ~10s.
- from backend.app.api.routes.camera import is_stream_active, try_get_active_buffered_frame
- if is_stream_active(printer_id):
- buffered = try_get_active_buffered_frame(printer_id)
- if buffered:
- return buffered
- logger.info(
- "Obico: viewer attached for printer %s but buffer empty; skipping this poll to avoid competing camera socket (#1348)",
- printer_id,
- )
- return None
- return await capture_camera_frame_bytes(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- timeout=SNAPSHOT_CAPTURE_TIMEOUT,
- )
- async def _check_printer(self, printer_id: int, status, settings: dict):
- task_name = getattr(status, "task_name", None) or getattr(status, "subtask_name", "") or ""
- key = f"{task_name}"
- if self._state_keys.get(printer_id) != key:
- self._states[printer_id] = PrintState()
- self._state_keys[printer_id] = key
- self._action_fired[printer_id] = False
- # Capture locally first, then hand Obico a nonce URL that returns the
- # cached bytes instantly. Obico's ML API is GET-only (/p/?img=URL) with a
- # hardcoded 5s read timeout which would otherwise race our /camera/snapshot
- # keyframe wait.
- frame = await self._capture_frame(printer_id)
- if not frame:
- self._last_error = f"Failed to capture snapshot for printer {printer_id}"
- logger.warning(self._last_error)
- return
- external_url = settings.get("external_url") or ""
- if not external_url:
- self._last_error = (
- "external_url setting is empty — Obico's ML API needs a reachable URL to fetch the snapshot from. "
- "Set Settings → General → External URL."
- )
- logger.warning(self._last_error)
- return
- nonce = await stash_frame(frame)
- snapshot_url = f"{external_url}/api/v1/obico/cached-frame/{nonce}"
- ml_url = f"{settings['ml_url']}/p/"
- try:
- async with httpx.AsyncClient(timeout=DETECTION_TIMEOUT) as client:
- resp = await client.get(ml_url, params={"img": snapshot_url})
- resp.raise_for_status()
- payload = resp.json()
- except Exception as e:
- detail = str(e) or type(e).__name__
- self._last_error = f"ML API call failed for printer {printer_id}: {detail}"
- logger.warning(self._last_error)
- return
- detections = payload.get("detections", []) if isinstance(payload, dict) else []
- current_p = score_from_detections(detections)
- state = self._states[printer_id]
- score = state.update(current_p)
- verdict = classify(score, settings["sensitivity"])
- self._last_class[printer_id] = verdict
- # A successful capture + ML call clears any transient error from previous
- # polls (typical case: cold-start RTSP timeout on first frame after startup,
- # followed by healthy polls that otherwise leave the banner stuck in the UI).
- self._last_error = None
- # Log every non-safe sample — safe samples would flood history
- if verdict != "safe" or detections:
- self._history.appendleft(
- {
- "printer_id": printer_id,
- "task_name": task_name,
- "timestamp": datetime.now(timezone.utc).isoformat(),
- "current_p": round(current_p, 4),
- "score": round(score, 4),
- "class": verdict,
- "detections": len(detections),
- }
- )
- if verdict == "failure" and not self._action_fired.get(printer_id):
- self._action_fired[printer_id] = True
- await self._dispatch_action(printer_id, settings["action"], task_name, score)
- async def _dispatch_action(self, printer_id: int, action: str, task_name: str, score: float):
- from backend.app.services.obico_actions import execute_action
- logger.warning(
- "Obico: failure detected on printer %s (task=%r score=%.3f) — action=%s",
- printer_id,
- task_name,
- score,
- action,
- )
- try:
- await execute_action(printer_id, action, task_name, score)
- except Exception as e:
- self._last_error = f"Action dispatch failed: {e or type(e).__name__}"
- logger.error(self._last_error)
- # ---- queries ----
- def get_status(self, sensitivity: str = "medium") -> dict:
- # Report the thresholds for the configured sensitivity, not a hardcoded
- # "medium" — otherwise the Status panel always shows the medium row
- # regardless of the user's selection (#1469). thresholds() falls back
- # to the medium multiplier for any unrecognized value.
- low, high = thresholds(sensitivity)
- return {
- "is_running": self._task is not None and not self._task.done(),
- "last_error": self._last_error,
- "per_printer": {
- pid: {
- "class": self._last_class.get(pid, "safe"),
- "frame_count": state.frame_count,
- "score": round(state.ewm_mean, 4),
- }
- for pid, state in self._states.items()
- },
- "thresholds": {"low": low, "high": high},
- "history": list(self._history),
- }
- async def test_connection(self, url: str) -> dict:
- """Ping the ML API health endpoint. Returns {ok, status_code, body, error}."""
- target = f"{url.rstrip('/')}/hc/"
- try:
- async with httpx.AsyncClient(timeout=HEALTH_TIMEOUT) as client:
- resp = await client.get(target)
- body = resp.text.strip()
- return {
- "ok": resp.status_code == 200 and body.lower() == "ok",
- "status_code": resp.status_code,
- "body": body,
- "error": None,
- }
- except Exception as e:
- return {"ok": False, "status_code": None, "body": None, "error": str(e) or type(e).__name__}
- obico_detection_service = ObicoDetectionService()
|