| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- """Action dispatch for Obico failure detection.
- Separated from the detection loop so actions can be unit-tested and swapped.
- """
- import logging
- from sqlalchemy import select
- from backend.app.core.database import async_session
- from backend.app.models.printer import Printer
- logger = logging.getLogger(__name__)
- async def execute_action(printer_id: int, action: str, task_name: str, score: float) -> None:
- """Run the configured action for a detected print failure.
- action: 'notify' | 'pause' | 'pause_and_off'
- """
- printer_name = await _get_printer_name(printer_id)
- if action in ("pause", "pause_and_off"):
- _pause_print(printer_id)
- if action == "pause_and_off":
- await _turn_off_linked_plugs(printer_id)
- await _notify(printer_id, printer_name, task_name, score, action)
- async def _get_printer_name(printer_id: int) -> str:
- async with async_session() as db:
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- return printer.name if printer else f"Printer {printer_id}"
- def _pause_print(printer_id: int) -> None:
- from backend.app.services.printer_manager import printer_manager
- client = printer_manager.get_client(printer_id)
- if not client:
- logger.warning("Obico pause: no MQTT client for printer %s", printer_id)
- return
- if not client.pause_print():
- logger.warning("Obico pause: pause_print() returned False for printer %s", printer_id)
- async def _turn_off_linked_plugs(printer_id: int) -> None:
- from backend.app.services.smart_plug_manager import smart_plug_manager
- async with async_session() as db:
- plugs = await smart_plug_manager._get_plugs_for_printer(printer_id, db)
- for plug in plugs:
- if not plug.enabled:
- continue
- try:
- service = await smart_plug_manager.get_service_for_plug(plug, db)
- await service.turn_off(plug)
- logger.info("Obico action: turned off plug %s for printer %s", plug.name, printer_id)
- except Exception as e:
- logger.error("Obico action: failed to turn off plug %s: %s", plug.name, e)
- async def _notify(printer_id: int, printer_name: str, task_name: str, score: float, action: str) -> None:
- from backend.app.services.notification_service import notification_service
- detail = (
- f"Possible print failure detected on '{task_name or 'current job'}' "
- f"(confidence {score:.2f}). Action taken: {action}."
- )
- async with async_session() as db:
- try:
- await notification_service.on_printer_error(
- printer_id=printer_id,
- printer_name=printer_name,
- error_type="ai_failure_detection",
- db=db,
- error_detail=detail,
- )
- except Exception as e:
- logger.error("Obico notify failed for printer %s: %s", printer_id, e)
|