obico.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. """API routes for Obico AI failure detection."""
  2. import logging
  3. from fastapi import APIRouter
  4. from pydantic import BaseModel
  5. from backend.app.core.auth import RequirePermissionIfAuthEnabled
  6. from backend.app.core.permissions import Permission
  7. from backend.app.models.user import User
  8. from backend.app.services.obico_detection import obico_detection_service
  9. logger = logging.getLogger(__name__)
  10. router = APIRouter(prefix="/obico", tags=["obico"])
  11. class TestConnectionRequest(BaseModel):
  12. url: str
  13. @router.get("/status")
  14. async def get_status(
  15. _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_READ),
  16. ):
  17. """Scheduler status, per-printer classification, and recent detection history."""
  18. settings = await obico_detection_service._load_settings()
  19. status = obico_detection_service.get_status()
  20. return {
  21. **status,
  22. "enabled": settings["enabled"],
  23. "ml_url": settings["ml_url"],
  24. "sensitivity": settings["sensitivity"],
  25. "action": settings["action"],
  26. "poll_interval": settings["poll_interval"],
  27. }
  28. @router.post("/test-connection")
  29. async def test_connection(
  30. req: TestConnectionRequest,
  31. _: User | None = RequirePermissionIfAuthEnabled(Permission.SETTINGS_UPDATE),
  32. ):
  33. """Ping the Obico ML API `/hc/` health endpoint. Returns ok + raw body."""
  34. if not req.url:
  35. return {"ok": False, "status_code": None, "body": None, "error": "URL is empty"}
  36. return await obico_detection_service.test_connection(req.url)