"""Background dispatch for print/reprint jobs. This service is separate from the app's print queue feature. It exists only to decouple "send/start print" operations (FTP upload + start command) from API request latency so the UI can continue immediately after dispatch. """ from __future__ import annotations import asyncio import logging import time import zipfile from collections import deque from dataclasses import dataclass, field from pathlib import Path from typing import Any, Literal from sqlalchemy import select from backend.app.core.config import settings from backend.app.core.database import async_session from backend.app.core.websocket import ws_manager from backend.app.models.library import LibraryFile from backend.app.models.printer import Printer from backend.app.services.archive import ArchiveService from backend.app.services.bambu_ftp import ( cache_3mf_download, delete_file_async, get_ftp_retry_settings, upload_file_async, with_ftp_retry, ) from backend.app.services.printer_manager import printer_manager logger = logging.getLogger(__name__) # Bambu firmware states that mean the project_file has actually been accepted # and the printer is now processing / running / paused mid-print. Used by the # direct-dispatch verifier (#1370): a transition into one of these states means # the print landed, anything else (e.g. FINISH -> IDLE after the user dismisses # a post-print prompt) is NOT a valid "command landed" signal even though the # state value did change. Mirrors the same constant in print_scheduler.py — # kept duplicated rather than imported to avoid coupling the two services and # to keep the value at the point of use. _ACTIVE_PRINT_STATES: frozenset[str] = frozenset({"PREPARE", "SLICING", "RUNNING", "PAUSE"}) class DispatchJobCancelled(Exception): """Raised when a dispatch job is cancelled by the user.""" class DispatchEnqueueRejected(Exception): """Raised when a dispatch job should not be accepted.""" @dataclass(slots=True) class PrintDispatchJob: id: int kind: Literal["reprint_archive", "print_library_file"] source_id: int source_name: str printer_id: int printer_name: str options: dict[str, Any] = field(default_factory=dict) requested_by_user_id: int | None = None requested_by_username: str | None = None project_id: int | None = None cleanup_library_after_dispatch: bool = False @dataclass(slots=True) class ActiveDispatchState: job: PrintDispatchJob message: str upload_bytes: int | None = None upload_total_bytes: int | None = None class BackgroundDispatchService: def __init__(self): self._queued_jobs: deque[PrintDispatchJob] = deque() self._dispatcher_task: asyncio.Task | None = None self._running_tasks: dict[int, asyncio.Task] = {} self._lock = asyncio.Lock() self._job_event = asyncio.Event() self._next_job_id = 1 self._active_jobs: dict[int, ActiveDispatchState] = {} self._cancel_requested_job_ids: set[int] = set() # Progress for the current "batch" (since queue became non-empty) self._batch_total = 0 self._batch_completed = 0 self._batch_failed = 0 @staticmethod def _printer_is_busy_printing(printer_id: int) -> bool: state = printer_manager.get_status(printer_id) if not state: return False return state.state in ("RUNNING", "PAUSE", "PAUSED") and bool(state.gcode_file) async def start(self): async with self._lock: if self._dispatcher_task and not self._dispatcher_task.done(): return self._dispatcher_task = asyncio.create_task(self._dispatcher_loop(), name="background-dispatch-dispatcher") logger.info("Background dispatch dispatcher started") async def stop(self): dispatcher: asyncio.Task | None = None running_tasks: list[asyncio.Task] = [] async with self._lock: dispatcher = self._dispatcher_task self._dispatcher_task = None running_tasks = list(self._running_tasks.values()) self._running_tasks.clear() self._active_jobs.clear() self._queued_jobs.clear() self._cancel_requested_job_ids.clear() self._job_event.set() if dispatcher: dispatcher.cancel() for task in running_tasks: task.cancel() if dispatcher: try: await dispatcher except asyncio.CancelledError: pass if running_tasks: await asyncio.gather(*running_tasks, return_exceptions=True) logger.info("Background dispatch dispatcher stopped") async def dispatch_reprint_archive( self, *, archive_id: int, archive_name: str, printer_id: int, printer_name: str, options: dict[str, Any], requested_by_user_id: int | None, requested_by_username: str | None, ) -> dict[str, Any]: return await self._dispatch( kind="reprint_archive", source_id=archive_id, source_name=archive_name, printer_id=printer_id, printer_name=printer_name, options=options, requested_by_user_id=requested_by_user_id, requested_by_username=requested_by_username, ) async def get_state(self) -> dict[str, Any]: """Get current dispatch queue state snapshot for newly connected clients.""" async with self._lock: return self._build_state_payload_unlocked() async def dispatch_print_library_file( self, *, file_id: int, filename: str, printer_id: int, printer_name: str, options: dict[str, Any], requested_by_user_id: int | None, requested_by_username: str | None, project_id: int | None = None, cleanup_library_after_dispatch: bool = False, ) -> dict[str, Any]: return await self._dispatch( kind="print_library_file", source_id=file_id, source_name=filename, printer_id=printer_id, printer_name=printer_name, options=options, requested_by_user_id=requested_by_user_id, requested_by_username=requested_by_username, project_id=project_id, cleanup_library_after_dispatch=cleanup_library_after_dispatch, ) async def cancel_job(self, job_id: int) -> dict[str, Any]: """Cancel a queued dispatch job. Queued jobs are removed immediately. Active jobs are cancelled cooperatively and will stop at the next cancellation checkpoint. """ async with self._lock: # Check active jobs first active_state = self._active_jobs.get(job_id) if active_state is not None: logger.info("Cancel requested for active dispatch job %s", job_id) self._cancel_requested_job_ids.add(job_id) active_job = active_state.job payload = self._build_state_payload_unlocked( recent_event={ "status": "cancelling", "job_id": active_job.id, "source_name": active_job.source_name, "printer_id": active_job.printer_id, "printer_name": active_job.printer_name, "message": "Cancelling current dispatch...", } ) result = { "cancelled": True, "pending": True, "job_id": active_job.id, "source_name": active_job.source_name, "printer_id": active_job.printer_id, "printer_name": active_job.printer_name, } await ws_manager.broadcast({"type": "background_dispatch", "data": payload}) return result # Check queued jobs cancelled_job: PrintDispatchJob | None = None for job in self._queued_jobs: if job.id == job_id: cancelled_job = job break if not cancelled_job: logger.info("Cancel requested for unknown dispatch job %s", job_id) return {"cancelled": False, "reason": "not_found"} self._queued_jobs.remove(cancelled_job) logger.info("Cancelled queued dispatch job %s", cancelled_job.id) self._batch_total = max(0, self._batch_total - 1) if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0: self._batch_completed = 0 self._batch_failed = 0 payload = self._build_state_payload_unlocked( recent_event={ "status": "cancelled", "job_id": cancelled_job.id, "source_name": cancelled_job.source_name, "printer_id": cancelled_job.printer_id, "printer_name": cancelled_job.printer_name, "message": "Cancelled from queue", } ) await ws_manager.broadcast({"type": "background_dispatch", "data": payload}) return { "cancelled": True, "pending": False, "job_id": cancelled_job.id, "source_name": cancelled_job.source_name, "printer_id": cancelled_job.printer_id, "printer_name": cancelled_job.printer_name, } async def _dispatch( self, *, kind: Literal["reprint_archive", "print_library_file"], source_id: int, source_name: str, printer_id: int, printer_name: str, options: dict[str, Any], requested_by_user_id: int | None, requested_by_username: str | None, project_id: int | None = None, cleanup_library_after_dispatch: bool = False, ) -> dict[str, Any]: async with self._lock: has_pending_for_printer = any(job.printer_id == printer_id for job in self._queued_jobs) has_active_for_printer = any(active.job.printer_id == printer_id for active in self._active_jobs.values()) if has_pending_for_printer or has_active_for_printer: raise DispatchEnqueueRejected(f"Printer {printer_name} already has a background dispatch in progress") if self._printer_is_busy_printing(printer_id): raise DispatchEnqueueRejected(f"Printer {printer_name} is currently busy printing") dispatch_position = len(self._queued_jobs) + len(self._active_jobs) + 1 job = PrintDispatchJob( id=self._next_job_id, kind=kind, source_id=source_id, source_name=source_name, printer_id=printer_id, printer_name=printer_name, options=options, requested_by_user_id=requested_by_user_id, requested_by_username=requested_by_username, project_id=project_id, cleanup_library_after_dispatch=cleanup_library_after_dispatch, ) self._next_job_id += 1 self._batch_total += 1 self._queued_jobs.append(job) self._job_event.set() payload = self._build_state_payload_unlocked( recent_event={ "status": "dispatched", "job_id": job.id, "source_name": source_name, "printer_id": printer_id, "printer_name": printer_name, "message": f"Dispatched to {printer_name}", } ) await ws_manager.broadcast({"type": "background_dispatch", "data": payload}) return { "dispatch_job_id": job.id, "dispatch_position": dispatch_position, "status": "dispatched", "printer_id": printer_id, "source_id": source_id, "source_name": source_name, } async def _dispatcher_loop(self): while True: await self._job_event.wait() self._job_event.clear() while True: payload: dict[str, Any] | None = None job_to_start: PrintDispatchJob | None = None async with self._lock: busy_printer_ids = {state.job.printer_id for state in self._active_jobs.values()} start_index = next( ( idx for idx, queued_job in enumerate(self._queued_jobs) if queued_job.printer_id not in busy_printer_ids ), None, ) if start_index is None: break job_to_start = self._queued_jobs[start_index] del self._queued_jobs[start_index] self._active_jobs[job_to_start.id] = ActiveDispatchState( job=job_to_start, message="Preparing background dispatch...", ) task = asyncio.create_task( self._run_active_job(job_to_start), name=f"background-dispatch-job-{job_to_start.id}" ) self._running_tasks[job_to_start.id] = task payload = self._build_state_payload_unlocked( recent_event={ "status": "processing", "job_id": job_to_start.id, "source_name": job_to_start.source_name, "printer_id": job_to_start.printer_id, "printer_name": job_to_start.printer_name, "message": "Preparing background dispatch...", } ) if payload: await ws_manager.broadcast({"type": "background_dispatch", "data": payload}) async def _run_active_job(self, job: PrintDispatchJob): try: await self._process_job(job) await self._mark_job_finished(job, failed=False, message="Background dispatch complete") except DispatchJobCancelled: await self._mark_job_cancelled(job) except asyncio.CancelledError: raise except Exception as e: logger.error("Background dispatch job %s failed: %s", job.id, e, exc_info=True) await self._mark_job_finished(job, failed=True, message=str(e)) finally: self._job_event.set() async def _set_active_message(self, job: PrintDispatchJob, message: str): async with self._lock: active = self._active_jobs.get(job.id) if not active: return active.message = message payload = self._build_state_payload_unlocked( recent_event={ "status": "processing", "job_id": active.job.id, "source_name": active.job.source_name, "printer_id": active.job.printer_id, "printer_name": active.job.printer_name, "message": message, } ) await ws_manager.broadcast({"type": "background_dispatch", "data": payload}) async def _set_active_upload_progress(self, job: PrintDispatchJob, uploaded: int, total: int): async with self._lock: active = self._active_jobs.get(job.id) if not active: return active.upload_bytes = max(0, int(uploaded)) active.upload_total_bytes = max(0, int(total)) payload = self._build_state_payload_unlocked( recent_event={ "status": "processing", "job_id": active.job.id, "source_name": active.job.source_name, "printer_id": active.job.printer_id, "printer_name": active.job.printer_name, "message": active.message, } ) await ws_manager.broadcast({"type": "background_dispatch", "data": payload}) async def _mark_job_finished(self, job: PrintDispatchJob, *, failed: bool, message: str): async with self._lock: if failed: self._batch_failed += 1 else: self._batch_completed += 1 self._active_jobs.pop(job.id, None) self._running_tasks.pop(job.id, None) self._cancel_requested_job_ids.discard(job.id) payload = self._build_state_payload_unlocked( recent_event={ "status": "failed" if failed else "completed", "job_id": job.id, "source_name": job.source_name, "printer_id": job.printer_id, "printer_name": job.printer_name, "message": message, } ) should_reset_batch = len(self._queued_jobs) == 0 and len(self._active_jobs) == 0 await ws_manager.broadcast({"type": "background_dispatch", "data": payload}) if should_reset_batch: async with self._lock: if len(self._queued_jobs) == 0 and len(self._active_jobs) == 0: self._batch_total = 0 self._batch_completed = 0 self._batch_failed = 0 async def _mark_job_cancelled(self, job: PrintDispatchJob): async with self._lock: self._active_jobs.pop(job.id, None) self._running_tasks.pop(job.id, None) self._cancel_requested_job_ids.discard(job.id) self._batch_total = max(0, self._batch_total - 1) if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0: self._batch_completed = 0 self._batch_failed = 0 payload = self._build_state_payload_unlocked( recent_event={ "status": "cancelled", "job_id": job.id, "source_name": job.source_name, "printer_id": job.printer_id, "printer_name": job.printer_name, "message": "Cancelled during dispatch", } ) await ws_manager.broadcast({"type": "background_dispatch", "data": payload}) def _is_cancel_requested(self, job_id: int) -> bool: return job_id in self._cancel_requested_job_ids def _raise_if_cancel_requested(self, job: PrintDispatchJob): if self._is_cancel_requested(job.id): raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled") def _build_state_payload_unlocked(self, recent_event: dict[str, Any] | None = None) -> dict[str, Any]: processing = len(self._active_jobs) dispatched = len(self._queued_jobs) dispatched_jobs = [ { "job_id": job.id, "kind": job.kind, "source_id": job.source_id, "source_name": job.source_name, "printer_id": job.printer_id, "printer_name": job.printer_name, } for job in list(self._queued_jobs) ] active_jobs: list[dict[str, Any]] = [] for active in self._active_jobs.values(): upload_progress_pct = None if active.upload_total_bytes and active.upload_total_bytes > 0 and active.upload_bytes is not None: upload_progress_pct = round( max(0.0, min(100.0, (active.upload_bytes / active.upload_total_bytes) * 100.0)), 1 ) active_jobs.append( { "job_id": active.job.id, "kind": active.job.kind, "source_id": active.job.source_id, "source_name": active.job.source_name, "printer_id": active.job.printer_id, "printer_name": active.job.printer_name, "message": active.message, "upload_bytes": active.upload_bytes, "upload_total_bytes": active.upload_total_bytes, "upload_progress_pct": upload_progress_pct, } ) active_jobs.sort(key=lambda item: int(item["job_id"])) active_job = active_jobs[0] if active_jobs else None return { "total": self._batch_total, "dispatched": dispatched, "processing": processing, "completed": self._batch_completed, "failed": self._batch_failed, "dispatched_jobs": dispatched_jobs, "active_jobs": active_jobs, "active_job": active_job, "recent_event": recent_event, } async def _process_job(self, job: PrintDispatchJob): if job.kind == "reprint_archive": await self._run_reprint_archive(job) return if job.kind == "print_library_file": await self._run_print_library_file(job) return raise RuntimeError(f"Unknown dispatch job kind: {job.kind}") async def _run_reprint_archive(self, job: PrintDispatchJob): from backend.app.main import register_expected_print async with async_session() as db: service = ArchiveService(db) archive = await service.get_archive(job.source_id) if not archive: raise RuntimeError("Archive not found") printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id)) if not printer: raise RuntimeError("Printer not found") printer_name = printer.name printer_ip = printer.ip_address printer_access_code = printer.access_code printer_model = printer.model archive_filename = archive.filename if not printer_manager.is_connected(job.printer_id): raise RuntimeError("Printer is not connected") file_path = settings.base_dir / archive.file_path if not file_path.exists(): raise RuntimeError("Archive file not found") base_name = archive.filename if base_name.endswith(".gcode.3mf"): base_name = base_name[:-10] elif base_name.endswith(".3mf"): base_name = base_name[:-4] remote_filename = f"{base_name}.3mf" # Sanitize: firmware parses ftp://{filename} as a URL, spaces break it remote_filename = remote_filename.replace(" ", "_") remote_path = f"/{remote_filename}" ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings() self._raise_if_cancel_requested(job) await self._set_active_message(job, f"Preparing upload to {printer_name}...") await delete_file_async( printer_ip, printer_access_code, remote_path, socket_timeout=ftp_timeout, printer_model=printer_model, ) self._raise_if_cancel_requested(job) try: await self._set_active_message(job, f"Uploading {archive_filename} to {printer_name}...") loop = asyncio.get_running_loop() progress_state = {"last_emit": 0.0, "last_bytes": 0} def upload_progress_callback(uploaded: int, total: int): if self._is_cancel_requested(job.id): raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload") now = time.monotonic() should_emit = ( uploaded >= total or now - progress_state["last_emit"] >= 0.2 or uploaded - progress_state["last_bytes"] >= 256 * 1024 ) if should_emit: progress_state["last_emit"] = now progress_state["last_bytes"] = uploaded loop.call_soon_threadsafe( lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t)) ) if ftp_retry_enabled: uploaded = await with_ftp_retry( upload_file_async, printer_ip, printer_access_code, file_path, remote_path, progress_callback=upload_progress_callback, socket_timeout=ftp_timeout, printer_model=printer_model, max_retries=ftp_retry_count, retry_delay=ftp_retry_delay, operation_name=f"Upload for reprint to {printer_name}", non_retry_exceptions=(DispatchJobCancelled,), ) else: uploaded = await upload_file_async( printer_ip, printer_access_code, file_path, remote_path, progress_callback=upload_progress_callback, socket_timeout=ftp_timeout, printer_model=printer_model, ) if uploaded: await self._set_active_upload_progress(job, 1, 1) if not uploaded: raise RuntimeError( "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)." ) register_expected_print( job.printer_id, remote_filename, job.source_id, ams_mapping=job.options.get("ams_mapping"), ) plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id")) self._raise_if_cancel_requested(job) await self._set_active_message(job, f"Starting print on {printer_name}...") started = printer_manager.start_print( job.printer_id, remote_filename, plate_id, ams_mapping=job.options.get("ams_mapping"), timelapse=job.options.get("timelapse", False), bed_levelling=job.options.get("bed_levelling", True), flow_cali=job.options.get("flow_cali", False), vibration_cali=job.options.get("vibration_cali", True), layer_inspect=job.options.get("layer_inspect", False), use_ams=job.options.get("use_ams", True), ) if not started: await self._cleanup_sd_card_file( printer_ip, printer_access_code, remote_path, printer_model, ) raise RuntimeError("Failed to start print") # Register the archive's local 3MF in the cover-cache so the # /cover endpoint can skip FTP — we already have the file on # disk, no need to refetch 36 MB from a printer whose FTP is # busy serving the active print (#1166 follow-up). cache_3mf_download(job.printer_id, remote_filename, file_path) # Wait for the printer to actually pick up the command before # marking the dispatch job complete (#1042). MQTT-publish success # only proves the command queued locally; the printer can still # reject it (HMS error pending, half-broken session, SD card # missing) and never transition. Until #1042 this watchdog was # fire-and-forget — the job was reported successful and the # user had no signal that the print never started. The uploaded # file is intentionally left on the printer's SD card on # timeout: the next dispatch will overwrite it via the existing # delete-then-upload step, and the printer may still be in the # middle of reading it if it picked up just past the timeout. pre_status = printer_manager.get_status(job.printer_id) pre_state = getattr(pre_status, "state", None) if pre_status else None pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None if pre_state: await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...") transitioned = await self._verify_print_response( job.printer_id, printer_name, pre_state, pre_subtask_id=pre_subtask_id, pre_gcode_file=pre_gcode_file, ) if not transitioned: raise RuntimeError( f"Printer did not acknowledge print command — state still {pre_state}. " f"Check the printer for a pending error (HMS code, plate-clear prompt, " f"SD card) and try again." ) if job.requested_by_user_id and job.requested_by_username: printer_manager.set_current_print_user( job.printer_id, job.requested_by_user_id, job.requested_by_username, ) except DispatchJobCancelled: await self._set_active_message(job, f"Cancelled upload on {printer_name}.") raise async def _run_print_library_file(self, job: PrintDispatchJob): from backend.app.main import register_expected_print async with async_session() as db: lib_file = await db.scalar(LibraryFile.active().where(LibraryFile.id == job.source_id)) if not lib_file: raise RuntimeError("File not found") if not self._is_sliced_file(lib_file.filename): raise RuntimeError("Not a sliced file. Only .gcode or .gcode.3mf files can be printed.") file_path = Path(settings.base_dir) / lib_file.file_path if not file_path.exists(): raise RuntimeError("File not found on disk") printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id)) if not printer: raise RuntimeError("Printer not found") printer_name = printer.name printer_ip = printer.ip_address printer_access_code = printer.access_code printer_model = printer.model library_filename = lib_file.filename if not printer_manager.is_connected(job.printer_id): raise RuntimeError("Printer is not connected") await self._set_active_message(job, f"Creating archive for {lib_file.filename}...") archive_service = ArchiveService(db) archive = await archive_service.archive_print( printer_id=job.printer_id, source_file=file_path, original_filename=lib_file.filename, project_id=job.project_id, created_by_id=job.requested_by_user_id, ) if not archive: raise RuntimeError("Failed to create archive") await db.flush() base_name = lib_file.filename if base_name.endswith(".gcode.3mf"): base_name = base_name[:-10] elif base_name.endswith(".3mf"): base_name = base_name[:-4] remote_filename = f"{base_name}.3mf" # Sanitize: firmware parses ftp://{filename} as a URL, spaces break it remote_filename = remote_filename.replace(" ", "_") remote_path = f"/{remote_filename}" ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings() self._raise_if_cancel_requested(job) await self._set_active_message(job, f"Preparing upload to {printer_name}...") await delete_file_async( printer_ip, printer_access_code, remote_path, socket_timeout=ftp_timeout, printer_model=printer_model, ) self._raise_if_cancel_requested(job) try: await self._set_active_message(job, f"Uploading {library_filename} to {printer_name}...") loop = asyncio.get_running_loop() progress_state = {"last_emit": 0.0, "last_bytes": 0} def upload_progress_callback(uploaded: int, total: int): if self._is_cancel_requested(job.id): raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload") now = time.monotonic() should_emit = ( uploaded >= total or now - progress_state["last_emit"] >= 0.2 or uploaded - progress_state["last_bytes"] >= 256 * 1024 ) if should_emit: progress_state["last_emit"] = now progress_state["last_bytes"] = uploaded loop.call_soon_threadsafe( lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t)) ) if ftp_retry_enabled: uploaded = await with_ftp_retry( upload_file_async, printer_ip, printer_access_code, file_path, remote_path, progress_callback=upload_progress_callback, socket_timeout=ftp_timeout, printer_model=printer_model, max_retries=ftp_retry_count, retry_delay=ftp_retry_delay, operation_name=f"Upload for print to {printer_name}", non_retry_exceptions=(DispatchJobCancelled,), ) else: uploaded = await upload_file_async( printer_ip, printer_access_code, file_path, remote_path, progress_callback=upload_progress_callback, socket_timeout=ftp_timeout, printer_model=printer_model, ) if uploaded: await self._set_active_upload_progress(job, 1, 1) if not uploaded: await db.rollback() raise RuntimeError( "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)." ) register_expected_print( job.printer_id, remote_filename, archive.id, ams_mapping=job.options.get("ams_mapping"), ) plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id")) self._raise_if_cancel_requested(job) await self._set_active_message(job, f"Starting print on {printer_name}...") started = printer_manager.start_print( job.printer_id, remote_filename, plate_id, ams_mapping=job.options.get("ams_mapping"), timelapse=job.options.get("timelapse", False), bed_levelling=job.options.get("bed_levelling", True), flow_cali=job.options.get("flow_cali", False), vibration_cali=job.options.get("vibration_cali", True), layer_inspect=job.options.get("layer_inspect", False), use_ams=job.options.get("use_ams", True), ) if not started: await self._cleanup_sd_card_file( printer_ip, printer_access_code, remote_path, printer_model, ) await db.rollback() raise RuntimeError("Failed to start print") # Same as the archive path: register the library file's local # 3MF in the cover-cache so /cover skips FTP (#1166 follow-up). cache_3mf_download(job.printer_id, remote_filename, file_path) # See _run_reprint_archive for rationale (#1042). On timeout # also rolls back the freshly-created archive so the library # flow doesn't leave behind a phantom row for a print that # never started. pre_status = printer_manager.get_status(job.printer_id) pre_state = getattr(pre_status, "state", None) if pre_status else None pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None if pre_state: await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...") transitioned = await self._verify_print_response( job.printer_id, printer_name, pre_state, pre_subtask_id=pre_subtask_id, pre_gcode_file=pre_gcode_file, ) if not transitioned: await db.rollback() raise RuntimeError( f"Printer did not acknowledge print command — state still {pre_state}. " f"Check the printer for a pending error (HMS code, plate-clear prompt, " f"SD card) and try again." ) if job.requested_by_user_id and job.requested_by_username: printer_manager.set_current_print_user( job.printer_id, job.requested_by_user_id, job.requested_by_username, ) # Direct-Print flow only: archive_print copies, so deleting the # transient library row + files here leaves archive intact. Disk # deletes run only after commit so a rollback leaves no orphan. cleanup_disk_paths: list[Path] = [] if job.cleanup_library_after_dispatch and not lib_file.is_external: cleanup_disk_paths.append(file_path) if lib_file.thumbnail_path: thumb_path = Path(lib_file.thumbnail_path) if not thumb_path.is_absolute(): thumb_path = Path(settings.base_dir) / lib_file.thumbnail_path cleanup_disk_paths.append(thumb_path) await db.delete(lib_file) await db.commit() for cleanup_path in cleanup_disk_paths: try: if cleanup_path.exists(): cleanup_path.unlink() except OSError as cleanup_err: logger.warning("Failed to delete transient library file %s: %s", cleanup_path, cleanup_err) except DispatchJobCancelled: await db.rollback() await self._set_active_message(job, f"Cancelled upload on {printer_name}.") raise @staticmethod async def _verify_print_response( printer_id: int, printer_name: str, pre_state: str, pre_subtask_id: str | None = None, pre_gcode_file: str | None = None, timeout: float = 90.0, poll_interval: float = 3.0, ) -> bool: """Wait for the printer to acknowledge a print command. Returns True if the printer transitioned (state advanced past pre_state or subtask_id advanced past pre_subtask_id). Returns False on timeout — in that case logs a warning and forces an MQTT reconnect, mirroring the queue-side watchdog (`_watchdog_print_start`). Caller is responsible for surfacing the False result to the user (typically by raising so the dispatch job is marked failed). Both transition signals are checked because H2D can sit at FINISH for ~50 s after accepting `project_file` before flipping to PREPARE; the printer echoes our per-dispatch identity back as `subtask_id` on `push_status` first, so a subtask_id change is a definitive "command landed" signal even while state is still FINISH (#1078). """ deadline = time.monotonic() + timeout last_status = None while time.monotonic() < deadline: await asyncio.sleep(poll_interval) state = printer_manager.get_status(printer_id) if not state: # Printer momentarily not reporting — could be a brief MQTT # disconnect mid-window. Keep polling rather than declaring # failure on the first missed tick; the printer may reconnect # within the remaining timeout and still surface a transition. continue last_status = state if state.state in _ACTIVE_PRINT_STATES: # Printer is actively processing the job. We do NOT accept # arbitrary state transitions: a printer going FINISH -> IDLE # (user dismissed the post-print prompt without accepting our # project_file) would otherwise look like "command landed" # and the dispatch job would be marked successful even though # no print is running (#1370). return True if pre_subtask_id is not None and state.subtask_id is not None and state.subtask_id != pre_subtask_id: # Printer picked up the job (subtask_id advanced). H2D can # sit at FINISH for ~50 s after accepting project_file before # transitioning to PREPARE, but the subtask_id flips to our # submission_id almost immediately (#1078). return True logger.warning( "Printer %s (%d) did not respond to print command within %.0fs " "(state still %s, subtask_id still %s) — printer may need restart", printer_name, printer_id, timeout, pre_state, pre_subtask_id, ) # Distinguish #1150 (slow parse) from #887/#936 (half-broken session) # via gcode_file: if the printer is now showing a different file than # before dispatch, the project_file command landed and the printer is # parsing — a forced reconnect mid-parse causes 0500_4003. If # gcode_file is unchanged, the publish was silently swallowed and the # original #936 recovery (force_reconnect → fresh client_id) is what # we want. Caveat: in the rare retry-same-file-after-timeout case the # printer's gcode_file looks identical before and after the publish # lands, so a slow parse on retry-same-file still falls through to the # reconnect (and the original 0500_4003) — accepted to avoid breaking # the half-broken-session recovery path. client = printer_manager.get_client(printer_id) current_gcode_file = getattr(last_status, "gcode_file", None) if last_status else None publish_landed = current_gcode_file is not None and current_gcode_file != pre_gcode_file if publish_landed: logger.warning( "Printer %s (%d) gcode_file changed to %r (was %r) — printer " "received the command and is parsing slowly. Skipping forced " "MQTT reconnect to avoid 0500_4003 mid-parse (#1150).", printer_name, printer_id, current_gcode_file, pre_gcode_file, ) elif client and hasattr(client, "force_reconnect_stale_session"): client.force_reconnect_stale_session( f"print command unacknowledged after {timeout:.0f}s " f"(state still {pre_state}, gcode_file {current_gcode_file!r})" ) return False @staticmethod async def _cleanup_sd_card_file( printer_ip: str, access_code: str, remote_path: str, printer_model: str | None, ): """Best-effort delete of uploaded file from printer SD card.""" try: await delete_file_async(printer_ip, access_code, remote_path, printer_model=printer_model) except Exception: pass # Best-effort — don't fail the error handler @staticmethod def _resolve_plate_id(file_path: Path, requested_plate_id: int | None) -> int: if requested_plate_id is not None: return requested_plate_id plate_id = 1 try: with zipfile.ZipFile(file_path, "r") as zf: for name in zf.namelist(): if name.startswith("Metadata/plate_") and name.endswith(".gcode"): plate_str = name[15:-6] plate_id = int(plate_str) break except (ValueError, zipfile.BadZipFile, OSError): pass return plate_id @staticmethod def _is_sliced_file(filename: str) -> bool: lower = filename.lower() return lower.endswith(".gcode") or lower.endswith(".gcode.3mf") background_dispatch = BackgroundDispatchService()