|
|
@@ -4,7 +4,7 @@ import asyncio
|
|
|
import logging
|
|
|
from datetime import datetime
|
|
|
|
|
|
-from sqlalchemy import select
|
|
|
+from sqlalchemy import func, select
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
from backend.app.core.config import settings
|
|
|
@@ -15,8 +15,10 @@ from backend.app.models.print_queue import PrintQueueItem
|
|
|
from backend.app.models.printer import Printer
|
|
|
from backend.app.models.smart_plug import SmartPlug
|
|
|
from backend.app.services.bambu_ftp import delete_file_async, get_ftp_retry_settings, upload_file_async, with_ftp_retry
|
|
|
+from backend.app.services.notification_service import notification_service
|
|
|
from backend.app.services.printer_manager import printer_manager
|
|
|
from backend.app.services.tasmota import tasmota_service
|
|
|
+from backend.app.utils.printer_models import normalize_printer_model
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@@ -62,13 +64,10 @@ class PrintScheduler:
|
|
|
if not items:
|
|
|
return
|
|
|
|
|
|
- # Group by printer - only process first item per printer
|
|
|
- processed_printers = set()
|
|
|
+ # Track busy printers to avoid assigning multiple items to same printer
|
|
|
+ busy_printers: set[int] = set()
|
|
|
|
|
|
for item in items:
|
|
|
- if item.printer_id in processed_printers:
|
|
|
- continue
|
|
|
-
|
|
|
# Check scheduled time first (scheduled_time is stored in UTC from ISO string)
|
|
|
if item.scheduled_time and item.scheduled_time > datetime.utcnow():
|
|
|
continue
|
|
|
@@ -77,46 +76,254 @@ class PrintScheduler:
|
|
|
if item.manual_start:
|
|
|
continue
|
|
|
|
|
|
- # Check if printer is idle
|
|
|
- printer_idle = self._is_printer_idle(item.printer_id)
|
|
|
- printer_connected = printer_manager.is_connected(item.printer_id)
|
|
|
-
|
|
|
- # If printer not connected, try to power on via smart plug
|
|
|
- if not printer_connected:
|
|
|
- plug = await self._get_smart_plug(db, item.printer_id)
|
|
|
- if plug and plug.auto_on and plug.enabled:
|
|
|
- logger.info(f"Printer {item.printer_id} offline, attempting to power on via smart plug")
|
|
|
- powered_on = await self._power_on_and_wait(plug, item.printer_id, db)
|
|
|
- if powered_on:
|
|
|
- printer_connected = True
|
|
|
- printer_idle = self._is_printer_idle(item.printer_id)
|
|
|
+ if item.printer_id:
|
|
|
+ # Specific printer assignment (existing behavior)
|
|
|
+ if item.printer_id in busy_printers:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Check if printer is idle
|
|
|
+ printer_idle = self._is_printer_idle(item.printer_id)
|
|
|
+ printer_connected = printer_manager.is_connected(item.printer_id)
|
|
|
+
|
|
|
+ # If printer not connected, try to power on via smart plug
|
|
|
+ if not printer_connected:
|
|
|
+ plug = await self._get_smart_plug(db, item.printer_id)
|
|
|
+ if plug and plug.auto_on and plug.enabled:
|
|
|
+ logger.info(f"Printer {item.printer_id} offline, attempting to power on via smart plug")
|
|
|
+ powered_on = await self._power_on_and_wait(plug, item.printer_id, db)
|
|
|
+ if powered_on:
|
|
|
+ printer_connected = True
|
|
|
+ printer_idle = self._is_printer_idle(item.printer_id)
|
|
|
+ else:
|
|
|
+ logger.warning(f"Could not power on printer {item.printer_id} via smart plug")
|
|
|
+ busy_printers.add(item.printer_id)
|
|
|
+ continue
|
|
|
else:
|
|
|
- logger.warning(f"Could not power on printer {item.printer_id} via smart plug")
|
|
|
- processed_printers.add(item.printer_id)
|
|
|
+ # No plug or auto_on disabled
|
|
|
+ busy_printers.add(item.printer_id)
|
|
|
continue
|
|
|
- else:
|
|
|
- # No plug or auto_on disabled
|
|
|
- processed_printers.add(item.printer_id)
|
|
|
+
|
|
|
+ # Check if printer is idle (busy with another print)
|
|
|
+ if not printer_idle:
|
|
|
+ busy_printers.add(item.printer_id)
|
|
|
continue
|
|
|
|
|
|
- # Check if printer is idle (busy with another print)
|
|
|
- if not printer_idle:
|
|
|
- processed_printers.add(item.printer_id)
|
|
|
- continue
|
|
|
+ # Check condition (previous print success)
|
|
|
+ if item.require_previous_success:
|
|
|
+ if not await self._check_previous_success(db, item):
|
|
|
+ item.status = "skipped"
|
|
|
+ item.error_message = "Previous print failed or was aborted"
|
|
|
+ item.completed_at = datetime.now()
|
|
|
+ await db.commit()
|
|
|
+ logger.info(f"Skipped queue item {item.id} - previous print failed")
|
|
|
+
|
|
|
+ # Send notification
|
|
|
+ job_name = await self._get_job_name(db, item)
|
|
|
+ printer = await self._get_printer(db, item.printer_id)
|
|
|
+ await notification_service.on_queue_job_skipped(
|
|
|
+ job_name=job_name,
|
|
|
+ printer_id=item.printer_id,
|
|
|
+ printer_name=printer.name if printer else "Unknown",
|
|
|
+ reason="Previous print failed or was aborted",
|
|
|
+ db=db,
|
|
|
+ )
|
|
|
+ continue
|
|
|
|
|
|
- # Check condition (previous print success)
|
|
|
- if item.require_previous_success:
|
|
|
- if not await self._check_previous_success(db, item):
|
|
|
- item.status = "skipped"
|
|
|
- item.error_message = "Previous print failed or was aborted"
|
|
|
- item.completed_at = datetime.now()
|
|
|
+ # Start the print
|
|
|
+ await self._start_print(db, item)
|
|
|
+ busy_printers.add(item.printer_id)
|
|
|
+
|
|
|
+ elif item.target_model:
|
|
|
+ # Model-based assignment - find any idle printer of matching model
|
|
|
+ # Parse required filament types if present
|
|
|
+ required_types = None
|
|
|
+ if item.required_filament_types:
|
|
|
+ try:
|
|
|
+ import json
|
|
|
+
|
|
|
+ required_types = json.loads(item.required_filament_types)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ printer_id, waiting_reason = await self._find_idle_printer_for_model(
|
|
|
+ db, item.target_model, busy_printers, required_types
|
|
|
+ )
|
|
|
+
|
|
|
+ # Update waiting_reason if changed and send notification when first waiting
|
|
|
+ if item.waiting_reason != waiting_reason:
|
|
|
+ was_waiting = item.waiting_reason is not None
|
|
|
+ item.waiting_reason = waiting_reason
|
|
|
await db.commit()
|
|
|
- logger.info(f"Skipped queue item {item.id} - previous print failed")
|
|
|
- continue
|
|
|
|
|
|
- # Start the print
|
|
|
- await self._start_print(db, item)
|
|
|
- processed_printers.add(item.printer_id)
|
|
|
+ # Send waiting notification only when transitioning to waiting state
|
|
|
+ if waiting_reason and not was_waiting:
|
|
|
+ job_name = await self._get_job_name(db, item)
|
|
|
+ await notification_service.on_queue_job_waiting(
|
|
|
+ job_name=job_name,
|
|
|
+ target_model=item.target_model,
|
|
|
+ waiting_reason=waiting_reason,
|
|
|
+ db=db,
|
|
|
+ )
|
|
|
+
|
|
|
+ if printer_id:
|
|
|
+ # Check condition (previous print success) before assigning
|
|
|
+ if item.require_previous_success:
|
|
|
+ if not await self._check_previous_success(db, item):
|
|
|
+ item.status = "skipped"
|
|
|
+ item.error_message = "Previous print failed or was aborted"
|
|
|
+ item.completed_at = datetime.now()
|
|
|
+ await db.commit()
|
|
|
+ logger.info(f"Skipped queue item {item.id} - previous print failed")
|
|
|
+
|
|
|
+ # Send notification
|
|
|
+ job_name = await self._get_job_name(db, item)
|
|
|
+ printer = await self._get_printer(db, printer_id)
|
|
|
+ await notification_service.on_queue_job_skipped(
|
|
|
+ job_name=job_name,
|
|
|
+ printer_id=printer_id,
|
|
|
+ printer_name=printer.name if printer else "Unknown",
|
|
|
+ reason="Previous print failed or was aborted",
|
|
|
+ db=db,
|
|
|
+ )
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Assign printer and start - clear waiting reason
|
|
|
+ item.printer_id = printer_id
|
|
|
+ item.waiting_reason = None
|
|
|
+ logger.info(f"Model-based assignment: queue item {item.id} assigned to printer {printer_id}")
|
|
|
+
|
|
|
+ # Send assignment notification
|
|
|
+ job_name = await self._get_job_name(db, item)
|
|
|
+ printer = await self._get_printer(db, printer_id)
|
|
|
+ await notification_service.on_queue_job_assigned(
|
|
|
+ job_name=job_name,
|
|
|
+ printer_id=printer_id,
|
|
|
+ printer_name=printer.name if printer else "Unknown",
|
|
|
+ target_model=item.target_model,
|
|
|
+ db=db,
|
|
|
+ )
|
|
|
+
|
|
|
+ await self._start_print(db, item)
|
|
|
+ busy_printers.add(printer_id)
|
|
|
+
|
|
|
+ async def _find_idle_printer_for_model(
|
|
|
+ self,
|
|
|
+ db: AsyncSession,
|
|
|
+ model: str,
|
|
|
+ exclude_ids: set[int],
|
|
|
+ required_filament_types: list[str] | None = None,
|
|
|
+ ) -> tuple[int | None, str | None]:
|
|
|
+ """Find an idle, connected printer matching the model with compatible filaments.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ db: Database session
|
|
|
+ model: Printer model to match (e.g., "X1C", "P1S")
|
|
|
+ exclude_ids: Printer IDs to exclude (already busy)
|
|
|
+ required_filament_types: Optional list of filament types needed (e.g., ["PLA", "PETG"])
|
|
|
+ If provided, only printers with all required types loaded will match.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple of (printer_id, waiting_reason):
|
|
|
+ - (printer_id, None) if a matching printer was found
|
|
|
+ - (None, reason) if no printer is available, with explanation
|
|
|
+ """
|
|
|
+ # Normalize model name and use case-insensitive matching
|
|
|
+ normalized_model = normalize_printer_model(model) or model
|
|
|
+ result = await db.execute(
|
|
|
+ select(Printer)
|
|
|
+ .where(func.lower(Printer.model) == normalized_model.lower())
|
|
|
+ .where(Printer.is_active == True) # noqa: E712
|
|
|
+ )
|
|
|
+ printers = list(result.scalars().all())
|
|
|
+
|
|
|
+ if not printers:
|
|
|
+ return None, f"No active {normalized_model} printers configured"
|
|
|
+
|
|
|
+ # Track reasons for skipping printers
|
|
|
+ printers_busy = []
|
|
|
+ printers_offline = []
|
|
|
+ printers_missing_filament = []
|
|
|
+
|
|
|
+ for printer in printers:
|
|
|
+ if printer.id in exclude_ids:
|
|
|
+ printers_busy.append(printer.name)
|
|
|
+ continue
|
|
|
+
|
|
|
+ is_connected = printer_manager.is_connected(printer.id)
|
|
|
+ is_idle = self._is_printer_idle(printer.id) if is_connected else False
|
|
|
+
|
|
|
+ if not is_connected:
|
|
|
+ printers_offline.append(printer.name)
|
|
|
+ continue
|
|
|
+
|
|
|
+ if not is_idle:
|
|
|
+ printers_busy.append(printer.name)
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Validate filament compatibility if required types are specified
|
|
|
+ if required_filament_types:
|
|
|
+ missing = self._get_missing_filament_types(printer.id, required_filament_types)
|
|
|
+ if missing:
|
|
|
+ printers_missing_filament.append((printer.name, missing))
|
|
|
+ logger.debug(f"Skipping printer {printer.id} ({printer.name}) - missing filaments: {missing}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Found a matching printer - clear waiting reason
|
|
|
+ return printer.id, None
|
|
|
+
|
|
|
+ # Build waiting reason from what we found
|
|
|
+ reasons = []
|
|
|
+ if printers_missing_filament:
|
|
|
+ # Filament mismatch is most actionable - show first
|
|
|
+ names_and_missing = [f"{name} (needs {', '.join(missing)})" for name, missing in printers_missing_filament]
|
|
|
+ reasons.append(f"Waiting for filament: {'; '.join(names_and_missing)}")
|
|
|
+ if printers_busy:
|
|
|
+ reasons.append(f"Busy: {', '.join(printers_busy)}")
|
|
|
+ if printers_offline:
|
|
|
+ reasons.append(f"Offline: {', '.join(printers_offline)}")
|
|
|
+
|
|
|
+ return None, " | ".join(reasons) if reasons else f"No available {model} printers"
|
|
|
+
|
|
|
+ def _get_missing_filament_types(self, printer_id: int, required_types: list[str]) -> list[str]:
|
|
|
+ """Get the list of required filament types that are not loaded on the printer.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ printer_id: The printer ID
|
|
|
+ required_types: List of filament types needed (e.g., ["PLA", "PETG"])
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ List of missing filament types (empty if all are loaded)
|
|
|
+ """
|
|
|
+ status = printer_manager.get_status(printer_id)
|
|
|
+ if not status:
|
|
|
+ return required_types # Can't determine, assume all missing
|
|
|
+
|
|
|
+ # Collect all filament types loaded on this printer (AMS units + external spool)
|
|
|
+ loaded_types: set[str] = set()
|
|
|
+
|
|
|
+ # Check AMS units (stored in raw_data["ams"])
|
|
|
+ ams_data = status.raw_data.get("ams", [])
|
|
|
+ if ams_data:
|
|
|
+ for ams_unit in ams_data:
|
|
|
+ for tray in ams_unit.get("tray", []):
|
|
|
+ tray_type = tray.get("tray_type")
|
|
|
+ if tray_type:
|
|
|
+ loaded_types.add(tray_type.upper())
|
|
|
+
|
|
|
+ # Check external spool (virtual tray, stored in raw_data["vt_tray"])
|
|
|
+ vt_tray = status.raw_data.get("vt_tray")
|
|
|
+ if vt_tray:
|
|
|
+ vt_type = vt_tray.get("tray_type")
|
|
|
+ if vt_type:
|
|
|
+ loaded_types.add(vt_type.upper())
|
|
|
+
|
|
|
+ # Find which required types are missing (case-insensitive comparison)
|
|
|
+ missing = []
|
|
|
+ for req_type in required_types:
|
|
|
+ if req_type.upper() not in loaded_types:
|
|
|
+ missing.append(req_type)
|
|
|
+
|
|
|
+ return missing
|
|
|
|
|
|
def _is_printer_idle(self, printer_id: int) -> bool:
|
|
|
"""Check if a printer is connected and idle."""
|
|
|
@@ -220,6 +427,25 @@ class PrintScheduler:
|
|
|
logger.info(f"Auto-off: Powering off printer {item.printer_id}")
|
|
|
await tasmota_service.turn_off(plug)
|
|
|
|
|
|
+ async def _get_job_name(self, db: AsyncSession, item: PrintQueueItem) -> str:
|
|
|
+ """Get a human-readable name for a queue item."""
|
|
|
+ if item.archive_id:
|
|
|
+ result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
|
|
|
+ archive = result.scalar_one_or_none()
|
|
|
+ if archive:
|
|
|
+ return archive.filename.replace(".gcode.3mf", "").replace(".3mf", "")
|
|
|
+ if item.library_file_id:
|
|
|
+ result = await db.execute(select(LibraryFile).where(LibraryFile.id == item.library_file_id))
|
|
|
+ library_file = result.scalar_one_or_none()
|
|
|
+ if library_file:
|
|
|
+ return library_file.filename.replace(".gcode.3mf", "").replace(".3mf", "")
|
|
|
+ return f"Job #{item.id}"
|
|
|
+
|
|
|
+ async def _get_printer(self, db: AsyncSession, printer_id: int) -> Printer | None:
|
|
|
+ """Get printer by ID."""
|
|
|
+ result = await db.execute(select(Printer).where(Printer.id == printer_id))
|
|
|
+ return result.scalar_one_or_none()
|
|
|
+
|
|
|
async def _start_print(self, db: AsyncSession, item: PrintQueueItem):
|
|
|
"""Upload file and start print for a queue item.
|
|
|
|
|
|
@@ -371,6 +597,16 @@ class PrintScheduler:
|
|
|
item.completed_at = datetime.utcnow()
|
|
|
await db.commit()
|
|
|
logger.error(f"Queue item {item.id}: FTP upload failed")
|
|
|
+
|
|
|
+ # Send failure notification
|
|
|
+ await notification_service.on_queue_job_failed(
|
|
|
+ job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
|
|
|
+ printer_id=printer.id,
|
|
|
+ printer_name=printer.name,
|
|
|
+ reason="Failed to upload file to printer",
|
|
|
+ db=db,
|
|
|
+ )
|
|
|
+
|
|
|
await self._power_off_if_needed(db, item)
|
|
|
return
|
|
|
|
|
|
@@ -411,6 +647,22 @@ class PrintScheduler:
|
|
|
await db.commit()
|
|
|
logger.info(f"Queue item {item.id}: Print started - {filename}")
|
|
|
|
|
|
+ # Get estimated time for notification
|
|
|
+ estimated_time = None
|
|
|
+ if archive and archive.print_time_seconds:
|
|
|
+ estimated_time = archive.print_time_seconds
|
|
|
+ elif library_file and library_file.print_time_seconds:
|
|
|
+ estimated_time = library_file.print_time_seconds
|
|
|
+
|
|
|
+ # Send job started notification
|
|
|
+ await notification_service.on_queue_job_started(
|
|
|
+ job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
|
|
|
+ printer_id=printer.id,
|
|
|
+ printer_name=printer.name,
|
|
|
+ db=db,
|
|
|
+ estimated_time=estimated_time,
|
|
|
+ )
|
|
|
+
|
|
|
# MQTT relay - publish queue job started
|
|
|
try:
|
|
|
from backend.app.services.mqtt_relay import mqtt_relay
|
|
|
@@ -430,6 +682,16 @@ class PrintScheduler:
|
|
|
item.completed_at = datetime.utcnow()
|
|
|
await db.commit()
|
|
|
logger.error(f"Queue item {item.id}: Failed to start print")
|
|
|
+
|
|
|
+ # Send failure notification
|
|
|
+ await notification_service.on_queue_job_failed(
|
|
|
+ job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
|
|
|
+ printer_id=printer.id,
|
|
|
+ printer_name=printer.name,
|
|
|
+ reason="Failed to send print command",
|
|
|
+ db=db,
|
|
|
+ )
|
|
|
+
|
|
|
await self._power_off_if_needed(db, item)
|
|
|
|
|
|
|