background_dispatch.py 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093
  1. """Background dispatch for print/reprint jobs.
  2. This service is separate from the app's print queue feature. It exists only to
  3. decouple "send/start print" operations (FTP upload + start command) from API
  4. request latency so the UI can continue immediately after dispatch.
  5. """
  6. from __future__ import annotations
  7. import asyncio
  8. import logging
  9. import time
  10. import zipfile
  11. from collections import deque
  12. from dataclasses import dataclass, field
  13. from pathlib import Path
  14. from typing import Any, Literal
  15. from sqlalchemy import select
  16. from backend.app.core.config import settings
  17. from backend.app.core.database import async_session
  18. from backend.app.core.websocket import ws_manager
  19. from backend.app.models.library import LibraryFile
  20. from backend.app.models.printer import Printer
  21. from backend.app.services.archive import ArchiveService
  22. from backend.app.services.bambu_ftp import (
  23. cache_3mf_download,
  24. delete_file_async,
  25. get_ftp_retry_settings,
  26. upload_file_async,
  27. with_ftp_retry,
  28. )
  29. from backend.app.services.printer_manager import printer_manager
  30. logger = logging.getLogger(__name__)
  31. # Bambu firmware states that mean the project_file has actually been accepted
  32. # and the printer is now processing / running / paused mid-print. Used by the
  33. # direct-dispatch verifier (#1370): a transition into one of these states means
  34. # the print landed, anything else (e.g. FINISH -> IDLE after the user dismisses
  35. # a post-print prompt) is NOT a valid "command landed" signal even though the
  36. # state value did change. Mirrors the same constant in print_scheduler.py —
  37. # kept duplicated rather than imported to avoid coupling the two services and
  38. # to keep the value at the point of use.
  39. _ACTIVE_PRINT_STATES: frozenset[str] = frozenset({"PREPARE", "SLICING", "RUNNING", "PAUSE"})
  40. class DispatchJobCancelled(Exception):
  41. """Raised when a dispatch job is cancelled by the user."""
  42. class DispatchEnqueueRejected(Exception):
  43. """Raised when a dispatch job should not be accepted."""
  44. @dataclass(slots=True)
  45. class PrintDispatchJob:
  46. id: int
  47. kind: Literal["reprint_archive", "print_library_file"]
  48. source_id: int
  49. source_name: str
  50. printer_id: int
  51. printer_name: str
  52. options: dict[str, Any] = field(default_factory=dict)
  53. requested_by_user_id: int | None = None
  54. requested_by_username: str | None = None
  55. project_id: int | None = None
  56. cleanup_library_after_dispatch: bool = False
  57. @dataclass(slots=True)
  58. class ActiveDispatchState:
  59. job: PrintDispatchJob
  60. message: str
  61. upload_bytes: int | None = None
  62. upload_total_bytes: int | None = None
  63. class BackgroundDispatchService:
  64. def __init__(self):
  65. self._queued_jobs: deque[PrintDispatchJob] = deque()
  66. self._dispatcher_task: asyncio.Task | None = None
  67. self._running_tasks: dict[int, asyncio.Task] = {}
  68. self._lock = asyncio.Lock()
  69. self._job_event = asyncio.Event()
  70. self._next_job_id = 1
  71. self._active_jobs: dict[int, ActiveDispatchState] = {}
  72. self._cancel_requested_job_ids: set[int] = set()
  73. # Progress for the current "batch" (since queue became non-empty)
  74. self._batch_total = 0
  75. self._batch_completed = 0
  76. self._batch_failed = 0
  77. @staticmethod
  78. def _printer_is_busy_printing(printer_id: int) -> bool:
  79. state = printer_manager.get_status(printer_id)
  80. if not state:
  81. return False
  82. return state.state in ("RUNNING", "PAUSE", "PAUSED") and bool(state.gcode_file)
  83. async def start(self):
  84. async with self._lock:
  85. if self._dispatcher_task and not self._dispatcher_task.done():
  86. return
  87. self._dispatcher_task = asyncio.create_task(self._dispatcher_loop(), name="background-dispatch-dispatcher")
  88. logger.info("Background dispatch dispatcher started")
  89. async def stop(self):
  90. dispatcher: asyncio.Task | None = None
  91. running_tasks: list[asyncio.Task] = []
  92. async with self._lock:
  93. dispatcher = self._dispatcher_task
  94. self._dispatcher_task = None
  95. running_tasks = list(self._running_tasks.values())
  96. self._running_tasks.clear()
  97. self._active_jobs.clear()
  98. self._queued_jobs.clear()
  99. self._cancel_requested_job_ids.clear()
  100. self._job_event.set()
  101. if dispatcher:
  102. dispatcher.cancel()
  103. for task in running_tasks:
  104. task.cancel()
  105. if dispatcher:
  106. try:
  107. await dispatcher
  108. except asyncio.CancelledError:
  109. pass
  110. if running_tasks:
  111. await asyncio.gather(*running_tasks, return_exceptions=True)
  112. logger.info("Background dispatch dispatcher stopped")
  113. async def dispatch_reprint_archive(
  114. self,
  115. *,
  116. archive_id: int,
  117. archive_name: str,
  118. printer_id: int,
  119. printer_name: str,
  120. options: dict[str, Any],
  121. requested_by_user_id: int | None,
  122. requested_by_username: str | None,
  123. ) -> dict[str, Any]:
  124. return await self._dispatch(
  125. kind="reprint_archive",
  126. source_id=archive_id,
  127. source_name=archive_name,
  128. printer_id=printer_id,
  129. printer_name=printer_name,
  130. options=options,
  131. requested_by_user_id=requested_by_user_id,
  132. requested_by_username=requested_by_username,
  133. )
  134. async def get_state(self) -> dict[str, Any]:
  135. """Get current dispatch queue state snapshot for newly connected clients."""
  136. async with self._lock:
  137. return self._build_state_payload_unlocked()
  138. async def dispatch_print_library_file(
  139. self,
  140. *,
  141. file_id: int,
  142. filename: str,
  143. printer_id: int,
  144. printer_name: str,
  145. options: dict[str, Any],
  146. requested_by_user_id: int | None,
  147. requested_by_username: str | None,
  148. project_id: int | None = None,
  149. cleanup_library_after_dispatch: bool = False,
  150. ) -> dict[str, Any]:
  151. return await self._dispatch(
  152. kind="print_library_file",
  153. source_id=file_id,
  154. source_name=filename,
  155. printer_id=printer_id,
  156. printer_name=printer_name,
  157. options=options,
  158. requested_by_user_id=requested_by_user_id,
  159. requested_by_username=requested_by_username,
  160. project_id=project_id,
  161. cleanup_library_after_dispatch=cleanup_library_after_dispatch,
  162. )
  163. async def cancel_job(self, job_id: int) -> dict[str, Any]:
  164. """Cancel a queued dispatch job.
  165. Queued jobs are removed immediately. Active jobs are cancelled
  166. cooperatively and will stop at the next cancellation checkpoint.
  167. """
  168. async with self._lock:
  169. # Check active jobs first
  170. active_state = self._active_jobs.get(job_id)
  171. if active_state is not None:
  172. logger.info("Cancel requested for active dispatch job %s", job_id)
  173. self._cancel_requested_job_ids.add(job_id)
  174. active_job = active_state.job
  175. payload = self._build_state_payload_unlocked(
  176. recent_event={
  177. "status": "cancelling",
  178. "job_id": active_job.id,
  179. "source_name": active_job.source_name,
  180. "printer_id": active_job.printer_id,
  181. "printer_name": active_job.printer_name,
  182. "message": "Cancelling current dispatch...",
  183. }
  184. )
  185. result = {
  186. "cancelled": True,
  187. "pending": True,
  188. "job_id": active_job.id,
  189. "source_name": active_job.source_name,
  190. "printer_id": active_job.printer_id,
  191. "printer_name": active_job.printer_name,
  192. }
  193. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  194. return result
  195. # Check queued jobs
  196. cancelled_job: PrintDispatchJob | None = None
  197. for job in self._queued_jobs:
  198. if job.id == job_id:
  199. cancelled_job = job
  200. break
  201. if not cancelled_job:
  202. logger.info("Cancel requested for unknown dispatch job %s", job_id)
  203. return {"cancelled": False, "reason": "not_found"}
  204. self._queued_jobs.remove(cancelled_job)
  205. logger.info("Cancelled queued dispatch job %s", cancelled_job.id)
  206. self._batch_total = max(0, self._batch_total - 1)
  207. if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  208. self._batch_completed = 0
  209. self._batch_failed = 0
  210. payload = self._build_state_payload_unlocked(
  211. recent_event={
  212. "status": "cancelled",
  213. "job_id": cancelled_job.id,
  214. "source_name": cancelled_job.source_name,
  215. "printer_id": cancelled_job.printer_id,
  216. "printer_name": cancelled_job.printer_name,
  217. "message": "Cancelled from queue",
  218. }
  219. )
  220. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  221. return {
  222. "cancelled": True,
  223. "pending": False,
  224. "job_id": cancelled_job.id,
  225. "source_name": cancelled_job.source_name,
  226. "printer_id": cancelled_job.printer_id,
  227. "printer_name": cancelled_job.printer_name,
  228. }
  229. async def _dispatch(
  230. self,
  231. *,
  232. kind: Literal["reprint_archive", "print_library_file"],
  233. source_id: int,
  234. source_name: str,
  235. printer_id: int,
  236. printer_name: str,
  237. options: dict[str, Any],
  238. requested_by_user_id: int | None,
  239. requested_by_username: str | None,
  240. project_id: int | None = None,
  241. cleanup_library_after_dispatch: bool = False,
  242. ) -> dict[str, Any]:
  243. async with self._lock:
  244. has_pending_for_printer = any(job.printer_id == printer_id for job in self._queued_jobs)
  245. has_active_for_printer = any(active.job.printer_id == printer_id for active in self._active_jobs.values())
  246. if has_pending_for_printer or has_active_for_printer:
  247. raise DispatchEnqueueRejected(f"Printer {printer_name} already has a background dispatch in progress")
  248. if self._printer_is_busy_printing(printer_id):
  249. raise DispatchEnqueueRejected(f"Printer {printer_name} is currently busy printing")
  250. dispatch_position = len(self._queued_jobs) + len(self._active_jobs) + 1
  251. job = PrintDispatchJob(
  252. id=self._next_job_id,
  253. kind=kind,
  254. source_id=source_id,
  255. source_name=source_name,
  256. printer_id=printer_id,
  257. printer_name=printer_name,
  258. options=options,
  259. requested_by_user_id=requested_by_user_id,
  260. requested_by_username=requested_by_username,
  261. project_id=project_id,
  262. cleanup_library_after_dispatch=cleanup_library_after_dispatch,
  263. )
  264. self._next_job_id += 1
  265. self._batch_total += 1
  266. self._queued_jobs.append(job)
  267. self._job_event.set()
  268. payload = self._build_state_payload_unlocked(
  269. recent_event={
  270. "status": "dispatched",
  271. "job_id": job.id,
  272. "source_name": source_name,
  273. "printer_id": printer_id,
  274. "printer_name": printer_name,
  275. "message": f"Dispatched to {printer_name}",
  276. }
  277. )
  278. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  279. return {
  280. "dispatch_job_id": job.id,
  281. "dispatch_position": dispatch_position,
  282. "status": "dispatched",
  283. "printer_id": printer_id,
  284. "source_id": source_id,
  285. "source_name": source_name,
  286. }
  287. async def _dispatcher_loop(self):
  288. while True:
  289. await self._job_event.wait()
  290. self._job_event.clear()
  291. while True:
  292. payload: dict[str, Any] | None = None
  293. job_to_start: PrintDispatchJob | None = None
  294. async with self._lock:
  295. busy_printer_ids = {state.job.printer_id for state in self._active_jobs.values()}
  296. start_index = next(
  297. (
  298. idx
  299. for idx, queued_job in enumerate(self._queued_jobs)
  300. if queued_job.printer_id not in busy_printer_ids
  301. ),
  302. None,
  303. )
  304. if start_index is None:
  305. break
  306. job_to_start = self._queued_jobs[start_index]
  307. del self._queued_jobs[start_index]
  308. self._active_jobs[job_to_start.id] = ActiveDispatchState(
  309. job=job_to_start,
  310. message="Preparing background dispatch...",
  311. )
  312. task = asyncio.create_task(
  313. self._run_active_job(job_to_start), name=f"background-dispatch-job-{job_to_start.id}"
  314. )
  315. self._running_tasks[job_to_start.id] = task
  316. payload = self._build_state_payload_unlocked(
  317. recent_event={
  318. "status": "processing",
  319. "job_id": job_to_start.id,
  320. "source_name": job_to_start.source_name,
  321. "printer_id": job_to_start.printer_id,
  322. "printer_name": job_to_start.printer_name,
  323. "message": "Preparing background dispatch...",
  324. }
  325. )
  326. if payload:
  327. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  328. async def _run_active_job(self, job: PrintDispatchJob):
  329. try:
  330. await self._process_job(job)
  331. await self._mark_job_finished(job, failed=False, message="Background dispatch complete")
  332. except DispatchJobCancelled:
  333. await self._mark_job_cancelled(job)
  334. except asyncio.CancelledError:
  335. raise
  336. except Exception as e:
  337. logger.error("Background dispatch job %s failed: %s", job.id, e, exc_info=True)
  338. await self._mark_job_finished(job, failed=True, message=str(e))
  339. finally:
  340. self._job_event.set()
  341. async def _set_active_message(self, job: PrintDispatchJob, message: str):
  342. async with self._lock:
  343. active = self._active_jobs.get(job.id)
  344. if not active:
  345. return
  346. active.message = message
  347. payload = self._build_state_payload_unlocked(
  348. recent_event={
  349. "status": "processing",
  350. "job_id": active.job.id,
  351. "source_name": active.job.source_name,
  352. "printer_id": active.job.printer_id,
  353. "printer_name": active.job.printer_name,
  354. "message": message,
  355. }
  356. )
  357. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  358. async def _set_active_upload_progress(self, job: PrintDispatchJob, uploaded: int, total: int):
  359. async with self._lock:
  360. active = self._active_jobs.get(job.id)
  361. if not active:
  362. return
  363. active.upload_bytes = max(0, int(uploaded))
  364. active.upload_total_bytes = max(0, int(total))
  365. payload = self._build_state_payload_unlocked(
  366. recent_event={
  367. "status": "processing",
  368. "job_id": active.job.id,
  369. "source_name": active.job.source_name,
  370. "printer_id": active.job.printer_id,
  371. "printer_name": active.job.printer_name,
  372. "message": active.message,
  373. }
  374. )
  375. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  376. async def _mark_job_finished(self, job: PrintDispatchJob, *, failed: bool, message: str):
  377. async with self._lock:
  378. if failed:
  379. self._batch_failed += 1
  380. else:
  381. self._batch_completed += 1
  382. self._active_jobs.pop(job.id, None)
  383. self._running_tasks.pop(job.id, None)
  384. self._cancel_requested_job_ids.discard(job.id)
  385. payload = self._build_state_payload_unlocked(
  386. recent_event={
  387. "status": "failed" if failed else "completed",
  388. "job_id": job.id,
  389. "source_name": job.source_name,
  390. "printer_id": job.printer_id,
  391. "printer_name": job.printer_name,
  392. "message": message,
  393. }
  394. )
  395. should_reset_batch = len(self._queued_jobs) == 0 and len(self._active_jobs) == 0
  396. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  397. if should_reset_batch:
  398. async with self._lock:
  399. if len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  400. self._batch_total = 0
  401. self._batch_completed = 0
  402. self._batch_failed = 0
  403. async def _mark_job_cancelled(self, job: PrintDispatchJob):
  404. async with self._lock:
  405. self._active_jobs.pop(job.id, None)
  406. self._running_tasks.pop(job.id, None)
  407. self._cancel_requested_job_ids.discard(job.id)
  408. self._batch_total = max(0, self._batch_total - 1)
  409. if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  410. self._batch_completed = 0
  411. self._batch_failed = 0
  412. payload = self._build_state_payload_unlocked(
  413. recent_event={
  414. "status": "cancelled",
  415. "job_id": job.id,
  416. "source_name": job.source_name,
  417. "printer_id": job.printer_id,
  418. "printer_name": job.printer_name,
  419. "message": "Cancelled during dispatch",
  420. }
  421. )
  422. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  423. def _is_cancel_requested(self, job_id: int) -> bool:
  424. return job_id in self._cancel_requested_job_ids
  425. def _raise_if_cancel_requested(self, job: PrintDispatchJob):
  426. if self._is_cancel_requested(job.id):
  427. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled")
  428. def _build_state_payload_unlocked(self, recent_event: dict[str, Any] | None = None) -> dict[str, Any]:
  429. processing = len(self._active_jobs)
  430. dispatched = len(self._queued_jobs)
  431. dispatched_jobs = [
  432. {
  433. "job_id": job.id,
  434. "kind": job.kind,
  435. "source_id": job.source_id,
  436. "source_name": job.source_name,
  437. "printer_id": job.printer_id,
  438. "printer_name": job.printer_name,
  439. }
  440. for job in list(self._queued_jobs)
  441. ]
  442. active_jobs: list[dict[str, Any]] = []
  443. for active in self._active_jobs.values():
  444. upload_progress_pct = None
  445. if active.upload_total_bytes and active.upload_total_bytes > 0 and active.upload_bytes is not None:
  446. upload_progress_pct = round(
  447. max(0.0, min(100.0, (active.upload_bytes / active.upload_total_bytes) * 100.0)), 1
  448. )
  449. active_jobs.append(
  450. {
  451. "job_id": active.job.id,
  452. "kind": active.job.kind,
  453. "source_id": active.job.source_id,
  454. "source_name": active.job.source_name,
  455. "printer_id": active.job.printer_id,
  456. "printer_name": active.job.printer_name,
  457. "message": active.message,
  458. "upload_bytes": active.upload_bytes,
  459. "upload_total_bytes": active.upload_total_bytes,
  460. "upload_progress_pct": upload_progress_pct,
  461. }
  462. )
  463. active_jobs.sort(key=lambda item: int(item["job_id"]))
  464. active_job = active_jobs[0] if active_jobs else None
  465. return {
  466. "total": self._batch_total,
  467. "dispatched": dispatched,
  468. "processing": processing,
  469. "completed": self._batch_completed,
  470. "failed": self._batch_failed,
  471. "dispatched_jobs": dispatched_jobs,
  472. "active_jobs": active_jobs,
  473. "active_job": active_job,
  474. "recent_event": recent_event,
  475. }
  476. async def _process_job(self, job: PrintDispatchJob):
  477. if job.kind == "reprint_archive":
  478. await self._run_reprint_archive(job)
  479. return
  480. if job.kind == "print_library_file":
  481. await self._run_print_library_file(job)
  482. return
  483. raise RuntimeError(f"Unknown dispatch job kind: {job.kind}")
  484. async def _run_reprint_archive(self, job: PrintDispatchJob):
  485. from backend.app.main import register_expected_print
  486. async with async_session() as db:
  487. service = ArchiveService(db)
  488. archive = await service.get_archive(job.source_id)
  489. if not archive:
  490. raise RuntimeError("Archive not found")
  491. printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id))
  492. if not printer:
  493. raise RuntimeError("Printer not found")
  494. printer_name = printer.name
  495. printer_ip = printer.ip_address
  496. printer_access_code = printer.access_code
  497. printer_model = printer.model
  498. archive_filename = archive.filename
  499. if not printer_manager.is_connected(job.printer_id):
  500. raise RuntimeError("Printer is not connected")
  501. file_path = settings.base_dir / archive.file_path
  502. if not file_path.exists():
  503. raise RuntimeError("Archive file not found")
  504. base_name = archive.filename
  505. if base_name.endswith(".gcode.3mf"):
  506. base_name = base_name[:-10]
  507. elif base_name.endswith(".3mf"):
  508. base_name = base_name[:-4]
  509. remote_filename = f"{base_name}.3mf"
  510. # Sanitize: firmware parses ftp://{filename} as a URL, spaces break it
  511. remote_filename = remote_filename.replace(" ", "_")
  512. remote_path = f"/{remote_filename}"
  513. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  514. self._raise_if_cancel_requested(job)
  515. await self._set_active_message(job, f"Preparing upload to {printer_name}...")
  516. await delete_file_async(
  517. printer_ip,
  518. printer_access_code,
  519. remote_path,
  520. socket_timeout=ftp_timeout,
  521. printer_model=printer_model,
  522. )
  523. self._raise_if_cancel_requested(job)
  524. try:
  525. await self._set_active_message(job, f"Uploading {archive_filename} to {printer_name}...")
  526. loop = asyncio.get_running_loop()
  527. progress_state = {"last_emit": 0.0, "last_bytes": 0}
  528. def upload_progress_callback(uploaded: int, total: int):
  529. if self._is_cancel_requested(job.id):
  530. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  531. now = time.monotonic()
  532. should_emit = (
  533. uploaded >= total
  534. or now - progress_state["last_emit"] >= 0.2
  535. or uploaded - progress_state["last_bytes"] >= 256 * 1024
  536. )
  537. if should_emit:
  538. progress_state["last_emit"] = now
  539. progress_state["last_bytes"] = uploaded
  540. loop.call_soon_threadsafe(
  541. lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t))
  542. )
  543. if ftp_retry_enabled:
  544. uploaded = await with_ftp_retry(
  545. upload_file_async,
  546. printer_ip,
  547. printer_access_code,
  548. file_path,
  549. remote_path,
  550. progress_callback=upload_progress_callback,
  551. socket_timeout=ftp_timeout,
  552. printer_model=printer_model,
  553. max_retries=ftp_retry_count,
  554. retry_delay=ftp_retry_delay,
  555. operation_name=f"Upload for reprint to {printer_name}",
  556. non_retry_exceptions=(DispatchJobCancelled,),
  557. )
  558. else:
  559. uploaded = await upload_file_async(
  560. printer_ip,
  561. printer_access_code,
  562. file_path,
  563. remote_path,
  564. progress_callback=upload_progress_callback,
  565. socket_timeout=ftp_timeout,
  566. printer_model=printer_model,
  567. )
  568. if uploaded:
  569. await self._set_active_upload_progress(job, 1, 1)
  570. if not uploaded:
  571. raise RuntimeError(
  572. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)."
  573. )
  574. register_expected_print(
  575. job.printer_id,
  576. remote_filename,
  577. job.source_id,
  578. ams_mapping=job.options.get("ams_mapping"),
  579. )
  580. plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id"))
  581. self._raise_if_cancel_requested(job)
  582. await self._set_active_message(job, f"Starting print on {printer_name}...")
  583. started = printer_manager.start_print(
  584. job.printer_id,
  585. remote_filename,
  586. plate_id,
  587. ams_mapping=job.options.get("ams_mapping"),
  588. timelapse=job.options.get("timelapse", False),
  589. bed_levelling=job.options.get("bed_levelling", True),
  590. flow_cali=job.options.get("flow_cali", False),
  591. vibration_cali=job.options.get("vibration_cali", True),
  592. layer_inspect=job.options.get("layer_inspect", False),
  593. use_ams=job.options.get("use_ams", True),
  594. )
  595. if not started:
  596. await self._cleanup_sd_card_file(
  597. printer_ip,
  598. printer_access_code,
  599. remote_path,
  600. printer_model,
  601. )
  602. raise RuntimeError("Failed to start print")
  603. # Register the archive's local 3MF in the cover-cache so the
  604. # /cover endpoint can skip FTP — we already have the file on
  605. # disk, no need to refetch 36 MB from a printer whose FTP is
  606. # busy serving the active print (#1166 follow-up).
  607. cache_3mf_download(job.printer_id, remote_filename, file_path)
  608. # Wait for the printer to actually pick up the command before
  609. # marking the dispatch job complete (#1042). MQTT-publish success
  610. # only proves the command queued locally; the printer can still
  611. # reject it (HMS error pending, half-broken session, SD card
  612. # missing) and never transition. Until #1042 this watchdog was
  613. # fire-and-forget — the job was reported successful and the
  614. # user had no signal that the print never started. The uploaded
  615. # file is intentionally left on the printer's SD card on
  616. # timeout: the next dispatch will overwrite it via the existing
  617. # delete-then-upload step, and the printer may still be in the
  618. # middle of reading it if it picked up just past the timeout.
  619. pre_status = printer_manager.get_status(job.printer_id)
  620. pre_state = getattr(pre_status, "state", None) if pre_status else None
  621. pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
  622. pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None
  623. if pre_state:
  624. await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...")
  625. transitioned = await self._verify_print_response(
  626. job.printer_id,
  627. printer_name,
  628. pre_state,
  629. pre_subtask_id=pre_subtask_id,
  630. pre_gcode_file=pre_gcode_file,
  631. )
  632. if not transitioned:
  633. raise RuntimeError(
  634. f"Printer did not acknowledge print command — state still {pre_state}. "
  635. f"Check the printer for a pending error (HMS code, plate-clear prompt, "
  636. f"SD card) and try again."
  637. )
  638. if job.requested_by_user_id and job.requested_by_username:
  639. printer_manager.set_current_print_user(
  640. job.printer_id,
  641. job.requested_by_user_id,
  642. job.requested_by_username,
  643. )
  644. except DispatchJobCancelled:
  645. await self._set_active_message(job, f"Cancelled upload on {printer_name}.")
  646. raise
  647. async def _run_print_library_file(self, job: PrintDispatchJob):
  648. from backend.app.main import register_expected_print
  649. async with async_session() as db:
  650. lib_file = await db.scalar(LibraryFile.active().where(LibraryFile.id == job.source_id))
  651. if not lib_file:
  652. raise RuntimeError("File not found")
  653. if not self._is_sliced_file(lib_file.filename):
  654. raise RuntimeError("Not a sliced file. Only .gcode or .gcode.3mf files can be printed.")
  655. file_path = Path(settings.base_dir) / lib_file.file_path
  656. if not file_path.exists():
  657. raise RuntimeError("File not found on disk")
  658. printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id))
  659. if not printer:
  660. raise RuntimeError("Printer not found")
  661. printer_name = printer.name
  662. printer_ip = printer.ip_address
  663. printer_access_code = printer.access_code
  664. printer_model = printer.model
  665. library_filename = lib_file.filename
  666. if not printer_manager.is_connected(job.printer_id):
  667. raise RuntimeError("Printer is not connected")
  668. await self._set_active_message(job, f"Creating archive for {lib_file.filename}...")
  669. archive_service = ArchiveService(db)
  670. archive = await archive_service.archive_print(
  671. printer_id=job.printer_id,
  672. source_file=file_path,
  673. original_filename=lib_file.filename,
  674. project_id=job.project_id,
  675. created_by_id=job.requested_by_user_id,
  676. )
  677. if not archive:
  678. raise RuntimeError("Failed to create archive")
  679. await db.flush()
  680. base_name = lib_file.filename
  681. if base_name.endswith(".gcode.3mf"):
  682. base_name = base_name[:-10]
  683. elif base_name.endswith(".3mf"):
  684. base_name = base_name[:-4]
  685. remote_filename = f"{base_name}.3mf"
  686. # Sanitize: firmware parses ftp://{filename} as a URL, spaces break it
  687. remote_filename = remote_filename.replace(" ", "_")
  688. remote_path = f"/{remote_filename}"
  689. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  690. self._raise_if_cancel_requested(job)
  691. await self._set_active_message(job, f"Preparing upload to {printer_name}...")
  692. await delete_file_async(
  693. printer_ip,
  694. printer_access_code,
  695. remote_path,
  696. socket_timeout=ftp_timeout,
  697. printer_model=printer_model,
  698. )
  699. self._raise_if_cancel_requested(job)
  700. try:
  701. await self._set_active_message(job, f"Uploading {library_filename} to {printer_name}...")
  702. loop = asyncio.get_running_loop()
  703. progress_state = {"last_emit": 0.0, "last_bytes": 0}
  704. def upload_progress_callback(uploaded: int, total: int):
  705. if self._is_cancel_requested(job.id):
  706. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  707. now = time.monotonic()
  708. should_emit = (
  709. uploaded >= total
  710. or now - progress_state["last_emit"] >= 0.2
  711. or uploaded - progress_state["last_bytes"] >= 256 * 1024
  712. )
  713. if should_emit:
  714. progress_state["last_emit"] = now
  715. progress_state["last_bytes"] = uploaded
  716. loop.call_soon_threadsafe(
  717. lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t))
  718. )
  719. if ftp_retry_enabled:
  720. uploaded = await with_ftp_retry(
  721. upload_file_async,
  722. printer_ip,
  723. printer_access_code,
  724. file_path,
  725. remote_path,
  726. progress_callback=upload_progress_callback,
  727. socket_timeout=ftp_timeout,
  728. printer_model=printer_model,
  729. max_retries=ftp_retry_count,
  730. retry_delay=ftp_retry_delay,
  731. operation_name=f"Upload for print to {printer_name}",
  732. non_retry_exceptions=(DispatchJobCancelled,),
  733. )
  734. else:
  735. uploaded = await upload_file_async(
  736. printer_ip,
  737. printer_access_code,
  738. file_path,
  739. remote_path,
  740. progress_callback=upload_progress_callback,
  741. socket_timeout=ftp_timeout,
  742. printer_model=printer_model,
  743. )
  744. if uploaded:
  745. await self._set_active_upload_progress(job, 1, 1)
  746. if not uploaded:
  747. await db.rollback()
  748. raise RuntimeError(
  749. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)."
  750. )
  751. register_expected_print(
  752. job.printer_id,
  753. remote_filename,
  754. archive.id,
  755. ams_mapping=job.options.get("ams_mapping"),
  756. )
  757. plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id"))
  758. self._raise_if_cancel_requested(job)
  759. await self._set_active_message(job, f"Starting print on {printer_name}...")
  760. started = printer_manager.start_print(
  761. job.printer_id,
  762. remote_filename,
  763. plate_id,
  764. ams_mapping=job.options.get("ams_mapping"),
  765. timelapse=job.options.get("timelapse", False),
  766. bed_levelling=job.options.get("bed_levelling", True),
  767. flow_cali=job.options.get("flow_cali", False),
  768. vibration_cali=job.options.get("vibration_cali", True),
  769. layer_inspect=job.options.get("layer_inspect", False),
  770. use_ams=job.options.get("use_ams", True),
  771. )
  772. if not started:
  773. await self._cleanup_sd_card_file(
  774. printer_ip,
  775. printer_access_code,
  776. remote_path,
  777. printer_model,
  778. )
  779. await db.rollback()
  780. raise RuntimeError("Failed to start print")
  781. # Same as the archive path: register the library file's local
  782. # 3MF in the cover-cache so /cover skips FTP (#1166 follow-up).
  783. cache_3mf_download(job.printer_id, remote_filename, file_path)
  784. # See _run_reprint_archive for rationale (#1042). On timeout
  785. # also rolls back the freshly-created archive so the library
  786. # flow doesn't leave behind a phantom row for a print that
  787. # never started.
  788. pre_status = printer_manager.get_status(job.printer_id)
  789. pre_state = getattr(pre_status, "state", None) if pre_status else None
  790. pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
  791. pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None
  792. if pre_state:
  793. await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...")
  794. transitioned = await self._verify_print_response(
  795. job.printer_id,
  796. printer_name,
  797. pre_state,
  798. pre_subtask_id=pre_subtask_id,
  799. pre_gcode_file=pre_gcode_file,
  800. )
  801. if not transitioned:
  802. await db.rollback()
  803. raise RuntimeError(
  804. f"Printer did not acknowledge print command — state still {pre_state}. "
  805. f"Check the printer for a pending error (HMS code, plate-clear prompt, "
  806. f"SD card) and try again."
  807. )
  808. if job.requested_by_user_id and job.requested_by_username:
  809. printer_manager.set_current_print_user(
  810. job.printer_id,
  811. job.requested_by_user_id,
  812. job.requested_by_username,
  813. )
  814. # Direct-Print flow only: archive_print copies, so deleting the
  815. # transient library row + files here leaves archive intact. Disk
  816. # deletes run only after commit so a rollback leaves no orphan.
  817. cleanup_disk_paths: list[Path] = []
  818. if job.cleanup_library_after_dispatch and not lib_file.is_external:
  819. cleanup_disk_paths.append(file_path)
  820. if lib_file.thumbnail_path:
  821. thumb_path = Path(lib_file.thumbnail_path)
  822. if not thumb_path.is_absolute():
  823. thumb_path = Path(settings.base_dir) / lib_file.thumbnail_path
  824. cleanup_disk_paths.append(thumb_path)
  825. await db.delete(lib_file)
  826. await db.commit()
  827. for cleanup_path in cleanup_disk_paths:
  828. try:
  829. if cleanup_path.exists():
  830. cleanup_path.unlink()
  831. except OSError as cleanup_err:
  832. logger.warning("Failed to delete transient library file %s: %s", cleanup_path, cleanup_err)
  833. except DispatchJobCancelled:
  834. await db.rollback()
  835. await self._set_active_message(job, f"Cancelled upload on {printer_name}.")
  836. raise
  837. @staticmethod
  838. async def _verify_print_response(
  839. printer_id: int,
  840. printer_name: str,
  841. pre_state: str,
  842. pre_subtask_id: str | None = None,
  843. pre_gcode_file: str | None = None,
  844. timeout: float = 90.0,
  845. poll_interval: float = 3.0,
  846. ) -> bool:
  847. """Wait for the printer to acknowledge a print command.
  848. Returns True if the printer transitioned (state advanced past pre_state
  849. or subtask_id advanced past pre_subtask_id). Returns False on timeout —
  850. in that case logs a warning and forces an MQTT reconnect, mirroring the
  851. queue-side watchdog (`_watchdog_print_start`). Caller is responsible
  852. for surfacing the False result to the user (typically by raising so the
  853. dispatch job is marked failed).
  854. Both transition signals are checked because H2D can sit at FINISH for
  855. ~50 s after accepting `project_file` before flipping to PREPARE; the
  856. printer echoes our per-dispatch identity back as `subtask_id` on
  857. `push_status` first, so a subtask_id change is a definitive "command
  858. landed" signal even while state is still FINISH (#1078).
  859. """
  860. deadline = time.monotonic() + timeout
  861. last_status = None
  862. while time.monotonic() < deadline:
  863. await asyncio.sleep(poll_interval)
  864. state = printer_manager.get_status(printer_id)
  865. if not state:
  866. # Printer momentarily not reporting — could be a brief MQTT
  867. # disconnect mid-window. Keep polling rather than declaring
  868. # failure on the first missed tick; the printer may reconnect
  869. # within the remaining timeout and still surface a transition.
  870. continue
  871. last_status = state
  872. if state.state in _ACTIVE_PRINT_STATES:
  873. # Printer is actively processing the job. We do NOT accept
  874. # arbitrary state transitions: a printer going FINISH -> IDLE
  875. # (user dismissed the post-print prompt without accepting our
  876. # project_file) would otherwise look like "command landed"
  877. # and the dispatch job would be marked successful even though
  878. # no print is running (#1370).
  879. return True
  880. if pre_subtask_id is not None and state.subtask_id is not None and state.subtask_id != pre_subtask_id:
  881. # Printer picked up the job (subtask_id advanced). H2D can
  882. # sit at FINISH for ~50 s after accepting project_file before
  883. # transitioning to PREPARE, but the subtask_id flips to our
  884. # submission_id almost immediately (#1078).
  885. return True
  886. logger.warning(
  887. "Printer %s (%d) did not respond to print command within %.0fs "
  888. "(state still %s, subtask_id still %s) — printer may need restart",
  889. printer_name,
  890. printer_id,
  891. timeout,
  892. pre_state,
  893. pre_subtask_id,
  894. )
  895. # Distinguish #1150 (slow parse) from #887/#936 (half-broken session)
  896. # via gcode_file: if the printer is now showing a different file than
  897. # before dispatch, the project_file command landed and the printer is
  898. # parsing — a forced reconnect mid-parse causes 0500_4003. If
  899. # gcode_file is unchanged, the publish was silently swallowed and the
  900. # original #936 recovery (force_reconnect → fresh client_id) is what
  901. # we want. Caveat: in the rare retry-same-file-after-timeout case the
  902. # printer's gcode_file looks identical before and after the publish
  903. # lands, so a slow parse on retry-same-file still falls through to the
  904. # reconnect (and the original 0500_4003) — accepted to avoid breaking
  905. # the half-broken-session recovery path.
  906. client = printer_manager.get_client(printer_id)
  907. current_gcode_file = getattr(last_status, "gcode_file", None) if last_status else None
  908. publish_landed = current_gcode_file is not None and current_gcode_file != pre_gcode_file
  909. if publish_landed:
  910. logger.warning(
  911. "Printer %s (%d) gcode_file changed to %r (was %r) — printer "
  912. "received the command and is parsing slowly. Skipping forced "
  913. "MQTT reconnect to avoid 0500_4003 mid-parse (#1150).",
  914. printer_name,
  915. printer_id,
  916. current_gcode_file,
  917. pre_gcode_file,
  918. )
  919. elif client and hasattr(client, "force_reconnect_stale_session"):
  920. client.force_reconnect_stale_session(
  921. f"print command unacknowledged after {timeout:.0f}s "
  922. f"(state still {pre_state}, gcode_file {current_gcode_file!r})"
  923. )
  924. return False
  925. @staticmethod
  926. async def _cleanup_sd_card_file(
  927. printer_ip: str,
  928. access_code: str,
  929. remote_path: str,
  930. printer_model: str | None,
  931. ):
  932. """Best-effort delete of uploaded file from printer SD card."""
  933. try:
  934. await delete_file_async(printer_ip, access_code, remote_path, printer_model=printer_model)
  935. except Exception:
  936. pass # Best-effort — don't fail the error handler
  937. @staticmethod
  938. def _resolve_plate_id(file_path: Path, requested_plate_id: int | None) -> int:
  939. if requested_plate_id is not None:
  940. return requested_plate_id
  941. plate_id = 1
  942. try:
  943. with zipfile.ZipFile(file_path, "r") as zf:
  944. for name in zf.namelist():
  945. if name.startswith("Metadata/plate_") and name.endswith(".gcode"):
  946. plate_str = name[15:-6]
  947. plate_id = int(plate_str)
  948. break
  949. except (ValueError, zipfile.BadZipFile, OSError):
  950. pass
  951. return plate_id
  952. @staticmethod
  953. def _is_sliced_file(filename: str) -> bool:
  954. lower = filename.lower()
  955. return lower.endswith(".gcode") or lower.endswith(".gcode.3mf")
  956. background_dispatch = BackgroundDispatchService()