obico_actions.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. """Action dispatch for Obico failure detection.
  2. Separated from the detection loop so actions can be unit-tested and swapped.
  3. """
  4. import logging
  5. from sqlalchemy import select
  6. from backend.app.core.database import async_session
  7. from backend.app.models.printer import Printer
  8. logger = logging.getLogger(__name__)
  9. async def execute_action(printer_id: int, action: str, task_name: str, score: float) -> None:
  10. """Run the configured action for a detected print failure.
  11. action: 'notify' | 'pause' | 'pause_and_off'
  12. """
  13. printer_name = await _get_printer_name(printer_id)
  14. if action in ("pause", "pause_and_off"):
  15. _pause_print(printer_id)
  16. if action == "pause_and_off":
  17. await _turn_off_linked_plugs(printer_id)
  18. await _notify(printer_id, printer_name, task_name, score, action)
  19. async def _get_printer_name(printer_id: int) -> str:
  20. async with async_session() as db:
  21. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  22. printer = result.scalar_one_or_none()
  23. return printer.name if printer else f"Printer {printer_id}"
  24. def _pause_print(printer_id: int) -> None:
  25. from backend.app.services.printer_manager import printer_manager
  26. client = printer_manager.get_client(printer_id)
  27. if not client:
  28. logger.warning("Obico pause: no MQTT client for printer %s", printer_id)
  29. return
  30. if not client.pause_print():
  31. logger.warning("Obico pause: pause_print() returned False for printer %s", printer_id)
  32. async def _turn_off_linked_plugs(printer_id: int) -> None:
  33. from backend.app.services.smart_plug_manager import smart_plug_manager
  34. async with async_session() as db:
  35. plugs = await smart_plug_manager._get_plugs_for_printer(printer_id, db)
  36. for plug in plugs:
  37. if not plug.enabled:
  38. continue
  39. try:
  40. service = await smart_plug_manager.get_service_for_plug(plug, db)
  41. await service.turn_off(plug)
  42. logger.info("Obico action: turned off plug %s for printer %s", plug.name, printer_id)
  43. except Exception as e:
  44. logger.error("Obico action: failed to turn off plug %s: %s", plug.name, e)
  45. async def _notify(printer_id: int, printer_name: str, task_name: str, score: float, action: str) -> None:
  46. from backend.app.services.notification_service import notification_service
  47. detail = (
  48. f"Possible print failure detected on '{task_name or 'current job'}' "
  49. f"(confidence {score:.2f}). Action taken: {action}."
  50. )
  51. async with async_session() as db:
  52. try:
  53. await notification_service.on_printer_error(
  54. printer_id=printer_id,
  55. printer_name=printer_name,
  56. error_type="ai_failure_detection",
  57. db=db,
  58. error_detail=detail,
  59. )
  60. except Exception as e:
  61. logger.error("Obico notify failed for printer %s: %s", printer_id, e)