background_dispatch.py 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080
  1. """Background dispatch for print/reprint jobs.
  2. This service is separate from the app's print queue feature. It exists only to
  3. decouple "send/start print" operations (FTP upload + start command) from API
  4. request latency so the UI can continue immediately after dispatch.
  5. """
  6. from __future__ import annotations
  7. import asyncio
  8. import logging
  9. import time
  10. import zipfile
  11. from collections import deque
  12. from dataclasses import dataclass, field
  13. from pathlib import Path
  14. from typing import Any, Literal
  15. from sqlalchemy import select
  16. from backend.app.core.config import settings
  17. from backend.app.core.database import async_session
  18. from backend.app.core.websocket import ws_manager
  19. from backend.app.models.library import LibraryFile
  20. from backend.app.models.printer import Printer
  21. from backend.app.services.archive import ArchiveService
  22. from backend.app.services.bambu_ftp import (
  23. cache_3mf_download,
  24. delete_file_async,
  25. get_ftp_retry_settings,
  26. upload_file_async,
  27. with_ftp_retry,
  28. )
  29. from backend.app.services.printer_manager import printer_manager
  30. from backend.app.utils.filename import derive_remote_filename
  31. logger = logging.getLogger(__name__)
  32. # Bambu firmware states that mean the project_file has actually been accepted
  33. # and the printer is now processing / running / paused mid-print. Used by the
  34. # direct-dispatch verifier (#1370): a transition into one of these states means
  35. # the print landed, anything else (e.g. FINISH -> IDLE after the user dismisses
  36. # a post-print prompt) is NOT a valid "command landed" signal even though the
  37. # state value did change. Mirrors the same constant in print_scheduler.py —
  38. # kept duplicated rather than imported to avoid coupling the two services and
  39. # to keep the value at the point of use.
  40. _ACTIVE_PRINT_STATES: frozenset[str] = frozenset({"PREPARE", "SLICING", "RUNNING", "PAUSE"})
  41. class DispatchJobCancelled(Exception):
  42. """Raised when a dispatch job is cancelled by the user."""
  43. class DispatchEnqueueRejected(Exception):
  44. """Raised when a dispatch job should not be accepted."""
  45. @dataclass(slots=True)
  46. class PrintDispatchJob:
  47. id: int
  48. kind: Literal["reprint_archive", "print_library_file"]
  49. source_id: int
  50. source_name: str
  51. printer_id: int
  52. printer_name: str
  53. options: dict[str, Any] = field(default_factory=dict)
  54. requested_by_user_id: int | None = None
  55. requested_by_username: str | None = None
  56. project_id: int | None = None
  57. cleanup_library_after_dispatch: bool = False
  58. @dataclass(slots=True)
  59. class ActiveDispatchState:
  60. job: PrintDispatchJob
  61. message: str
  62. upload_bytes: int | None = None
  63. upload_total_bytes: int | None = None
  64. class BackgroundDispatchService:
  65. def __init__(self):
  66. self._queued_jobs: deque[PrintDispatchJob] = deque()
  67. self._dispatcher_task: asyncio.Task | None = None
  68. self._running_tasks: dict[int, asyncio.Task] = {}
  69. self._lock = asyncio.Lock()
  70. self._job_event = asyncio.Event()
  71. self._next_job_id = 1
  72. self._active_jobs: dict[int, ActiveDispatchState] = {}
  73. self._cancel_requested_job_ids: set[int] = set()
  74. # Progress for the current "batch" (since queue became non-empty)
  75. self._batch_total = 0
  76. self._batch_completed = 0
  77. self._batch_failed = 0
  78. @staticmethod
  79. def _printer_is_busy_printing(printer_id: int) -> bool:
  80. state = printer_manager.get_status(printer_id)
  81. if not state:
  82. return False
  83. return state.state in ("RUNNING", "PAUSE", "PAUSED") and bool(state.gcode_file)
  84. async def start(self):
  85. async with self._lock:
  86. if self._dispatcher_task and not self._dispatcher_task.done():
  87. return
  88. self._dispatcher_task = asyncio.create_task(self._dispatcher_loop(), name="background-dispatch-dispatcher")
  89. logger.info("Background dispatch dispatcher started")
  90. async def stop(self):
  91. dispatcher: asyncio.Task | None = None
  92. running_tasks: list[asyncio.Task] = []
  93. async with self._lock:
  94. dispatcher = self._dispatcher_task
  95. self._dispatcher_task = None
  96. running_tasks = list(self._running_tasks.values())
  97. self._running_tasks.clear()
  98. self._active_jobs.clear()
  99. self._queued_jobs.clear()
  100. self._cancel_requested_job_ids.clear()
  101. self._job_event.set()
  102. if dispatcher:
  103. dispatcher.cancel()
  104. for task in running_tasks:
  105. task.cancel()
  106. if dispatcher:
  107. try:
  108. await dispatcher
  109. except asyncio.CancelledError:
  110. pass
  111. if running_tasks:
  112. await asyncio.gather(*running_tasks, return_exceptions=True)
  113. logger.info("Background dispatch dispatcher stopped")
  114. async def dispatch_reprint_archive(
  115. self,
  116. *,
  117. archive_id: int,
  118. archive_name: str,
  119. printer_id: int,
  120. printer_name: str,
  121. options: dict[str, Any],
  122. requested_by_user_id: int | None,
  123. requested_by_username: str | None,
  124. ) -> dict[str, Any]:
  125. return await self._dispatch(
  126. kind="reprint_archive",
  127. source_id=archive_id,
  128. source_name=archive_name,
  129. printer_id=printer_id,
  130. printer_name=printer_name,
  131. options=options,
  132. requested_by_user_id=requested_by_user_id,
  133. requested_by_username=requested_by_username,
  134. )
  135. async def get_state(self) -> dict[str, Any]:
  136. """Get current dispatch queue state snapshot for newly connected clients."""
  137. async with self._lock:
  138. return self._build_state_payload_unlocked()
  139. async def dispatch_print_library_file(
  140. self,
  141. *,
  142. file_id: int,
  143. filename: str,
  144. printer_id: int,
  145. printer_name: str,
  146. options: dict[str, Any],
  147. requested_by_user_id: int | None,
  148. requested_by_username: str | None,
  149. project_id: int | None = None,
  150. cleanup_library_after_dispatch: bool = False,
  151. ) -> dict[str, Any]:
  152. return await self._dispatch(
  153. kind="print_library_file",
  154. source_id=file_id,
  155. source_name=filename,
  156. printer_id=printer_id,
  157. printer_name=printer_name,
  158. options=options,
  159. requested_by_user_id=requested_by_user_id,
  160. requested_by_username=requested_by_username,
  161. project_id=project_id,
  162. cleanup_library_after_dispatch=cleanup_library_after_dispatch,
  163. )
  164. async def cancel_job(self, job_id: int) -> dict[str, Any]:
  165. """Cancel a queued dispatch job.
  166. Queued jobs are removed immediately. Active jobs are cancelled
  167. cooperatively and will stop at the next cancellation checkpoint.
  168. """
  169. async with self._lock:
  170. # Check active jobs first
  171. active_state = self._active_jobs.get(job_id)
  172. if active_state is not None:
  173. logger.info("Cancel requested for active dispatch job %s", job_id)
  174. self._cancel_requested_job_ids.add(job_id)
  175. active_job = active_state.job
  176. payload = self._build_state_payload_unlocked(
  177. recent_event={
  178. "status": "cancelling",
  179. "job_id": active_job.id,
  180. "source_name": active_job.source_name,
  181. "printer_id": active_job.printer_id,
  182. "printer_name": active_job.printer_name,
  183. "message": "Cancelling current dispatch...",
  184. }
  185. )
  186. result = {
  187. "cancelled": True,
  188. "pending": True,
  189. "job_id": active_job.id,
  190. "source_name": active_job.source_name,
  191. "printer_id": active_job.printer_id,
  192. "printer_name": active_job.printer_name,
  193. }
  194. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  195. return result
  196. # Check queued jobs
  197. cancelled_job: PrintDispatchJob | None = None
  198. for job in self._queued_jobs:
  199. if job.id == job_id:
  200. cancelled_job = job
  201. break
  202. if not cancelled_job:
  203. logger.info("Cancel requested for unknown dispatch job %s", job_id)
  204. return {"cancelled": False, "reason": "not_found"}
  205. self._queued_jobs.remove(cancelled_job)
  206. logger.info("Cancelled queued dispatch job %s", cancelled_job.id)
  207. self._batch_total = max(0, self._batch_total - 1)
  208. if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  209. self._batch_completed = 0
  210. self._batch_failed = 0
  211. payload = self._build_state_payload_unlocked(
  212. recent_event={
  213. "status": "cancelled",
  214. "job_id": cancelled_job.id,
  215. "source_name": cancelled_job.source_name,
  216. "printer_id": cancelled_job.printer_id,
  217. "printer_name": cancelled_job.printer_name,
  218. "message": "Cancelled from queue",
  219. }
  220. )
  221. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  222. return {
  223. "cancelled": True,
  224. "pending": False,
  225. "job_id": cancelled_job.id,
  226. "source_name": cancelled_job.source_name,
  227. "printer_id": cancelled_job.printer_id,
  228. "printer_name": cancelled_job.printer_name,
  229. }
  230. async def _dispatch(
  231. self,
  232. *,
  233. kind: Literal["reprint_archive", "print_library_file"],
  234. source_id: int,
  235. source_name: str,
  236. printer_id: int,
  237. printer_name: str,
  238. options: dict[str, Any],
  239. requested_by_user_id: int | None,
  240. requested_by_username: str | None,
  241. project_id: int | None = None,
  242. cleanup_library_after_dispatch: bool = False,
  243. ) -> dict[str, Any]:
  244. async with self._lock:
  245. has_pending_for_printer = any(job.printer_id == printer_id for job in self._queued_jobs)
  246. has_active_for_printer = any(active.job.printer_id == printer_id for active in self._active_jobs.values())
  247. if has_pending_for_printer or has_active_for_printer:
  248. raise DispatchEnqueueRejected(f"Printer {printer_name} already has a background dispatch in progress")
  249. if self._printer_is_busy_printing(printer_id):
  250. raise DispatchEnqueueRejected(f"Printer {printer_name} is currently busy printing")
  251. dispatch_position = len(self._queued_jobs) + len(self._active_jobs) + 1
  252. job = PrintDispatchJob(
  253. id=self._next_job_id,
  254. kind=kind,
  255. source_id=source_id,
  256. source_name=source_name,
  257. printer_id=printer_id,
  258. printer_name=printer_name,
  259. options=options,
  260. requested_by_user_id=requested_by_user_id,
  261. requested_by_username=requested_by_username,
  262. project_id=project_id,
  263. cleanup_library_after_dispatch=cleanup_library_after_dispatch,
  264. )
  265. self._next_job_id += 1
  266. self._batch_total += 1
  267. self._queued_jobs.append(job)
  268. self._job_event.set()
  269. payload = self._build_state_payload_unlocked(
  270. recent_event={
  271. "status": "dispatched",
  272. "job_id": job.id,
  273. "source_name": source_name,
  274. "printer_id": printer_id,
  275. "printer_name": printer_name,
  276. "message": f"Dispatched to {printer_name}",
  277. }
  278. )
  279. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  280. return {
  281. "dispatch_job_id": job.id,
  282. "dispatch_position": dispatch_position,
  283. "status": "dispatched",
  284. "printer_id": printer_id,
  285. "source_id": source_id,
  286. "source_name": source_name,
  287. }
  288. async def _dispatcher_loop(self):
  289. while True:
  290. await self._job_event.wait()
  291. self._job_event.clear()
  292. while True:
  293. payload: dict[str, Any] | None = None
  294. job_to_start: PrintDispatchJob | None = None
  295. async with self._lock:
  296. busy_printer_ids = {state.job.printer_id for state in self._active_jobs.values()}
  297. start_index = next(
  298. (
  299. idx
  300. for idx, queued_job in enumerate(self._queued_jobs)
  301. if queued_job.printer_id not in busy_printer_ids
  302. ),
  303. None,
  304. )
  305. if start_index is None:
  306. break
  307. job_to_start = self._queued_jobs[start_index]
  308. del self._queued_jobs[start_index]
  309. self._active_jobs[job_to_start.id] = ActiveDispatchState(
  310. job=job_to_start,
  311. message="Preparing background dispatch...",
  312. )
  313. task = asyncio.create_task(
  314. self._run_active_job(job_to_start), name=f"background-dispatch-job-{job_to_start.id}"
  315. )
  316. self._running_tasks[job_to_start.id] = task
  317. payload = self._build_state_payload_unlocked(
  318. recent_event={
  319. "status": "processing",
  320. "job_id": job_to_start.id,
  321. "source_name": job_to_start.source_name,
  322. "printer_id": job_to_start.printer_id,
  323. "printer_name": job_to_start.printer_name,
  324. "message": "Preparing background dispatch...",
  325. }
  326. )
  327. if payload:
  328. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  329. async def _run_active_job(self, job: PrintDispatchJob):
  330. try:
  331. await self._process_job(job)
  332. await self._mark_job_finished(job, failed=False, message="Background dispatch complete")
  333. except DispatchJobCancelled:
  334. await self._mark_job_cancelled(job)
  335. except asyncio.CancelledError:
  336. raise
  337. except Exception as e:
  338. logger.error("Background dispatch job %s failed: %s", job.id, e, exc_info=True)
  339. await self._mark_job_finished(job, failed=True, message=str(e))
  340. finally:
  341. self._job_event.set()
  342. async def _set_active_message(self, job: PrintDispatchJob, message: str):
  343. async with self._lock:
  344. active = self._active_jobs.get(job.id)
  345. if not active:
  346. return
  347. active.message = message
  348. payload = self._build_state_payload_unlocked(
  349. recent_event={
  350. "status": "processing",
  351. "job_id": active.job.id,
  352. "source_name": active.job.source_name,
  353. "printer_id": active.job.printer_id,
  354. "printer_name": active.job.printer_name,
  355. "message": message,
  356. }
  357. )
  358. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  359. async def _set_active_upload_progress(self, job: PrintDispatchJob, uploaded: int, total: int):
  360. async with self._lock:
  361. active = self._active_jobs.get(job.id)
  362. if not active:
  363. return
  364. active.upload_bytes = max(0, int(uploaded))
  365. active.upload_total_bytes = max(0, int(total))
  366. payload = self._build_state_payload_unlocked(
  367. recent_event={
  368. "status": "processing",
  369. "job_id": active.job.id,
  370. "source_name": active.job.source_name,
  371. "printer_id": active.job.printer_id,
  372. "printer_name": active.job.printer_name,
  373. "message": active.message,
  374. }
  375. )
  376. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  377. async def _mark_job_finished(self, job: PrintDispatchJob, *, failed: bool, message: str):
  378. async with self._lock:
  379. if failed:
  380. self._batch_failed += 1
  381. else:
  382. self._batch_completed += 1
  383. self._active_jobs.pop(job.id, None)
  384. self._running_tasks.pop(job.id, None)
  385. self._cancel_requested_job_ids.discard(job.id)
  386. payload = self._build_state_payload_unlocked(
  387. recent_event={
  388. "status": "failed" if failed else "completed",
  389. "job_id": job.id,
  390. "source_name": job.source_name,
  391. "printer_id": job.printer_id,
  392. "printer_name": job.printer_name,
  393. "message": message,
  394. }
  395. )
  396. should_reset_batch = len(self._queued_jobs) == 0 and len(self._active_jobs) == 0
  397. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  398. if should_reset_batch:
  399. async with self._lock:
  400. if len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  401. self._batch_total = 0
  402. self._batch_completed = 0
  403. self._batch_failed = 0
  404. async def _mark_job_cancelled(self, job: PrintDispatchJob):
  405. async with self._lock:
  406. self._active_jobs.pop(job.id, None)
  407. self._running_tasks.pop(job.id, None)
  408. self._cancel_requested_job_ids.discard(job.id)
  409. self._batch_total = max(0, self._batch_total - 1)
  410. if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  411. self._batch_completed = 0
  412. self._batch_failed = 0
  413. payload = self._build_state_payload_unlocked(
  414. recent_event={
  415. "status": "cancelled",
  416. "job_id": job.id,
  417. "source_name": job.source_name,
  418. "printer_id": job.printer_id,
  419. "printer_name": job.printer_name,
  420. "message": "Cancelled during dispatch",
  421. }
  422. )
  423. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  424. def _is_cancel_requested(self, job_id: int) -> bool:
  425. return job_id in self._cancel_requested_job_ids
  426. def _raise_if_cancel_requested(self, job: PrintDispatchJob):
  427. if self._is_cancel_requested(job.id):
  428. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled")
  429. def _build_state_payload_unlocked(self, recent_event: dict[str, Any] | None = None) -> dict[str, Any]:
  430. processing = len(self._active_jobs)
  431. dispatched = len(self._queued_jobs)
  432. dispatched_jobs = [
  433. {
  434. "job_id": job.id,
  435. "kind": job.kind,
  436. "source_id": job.source_id,
  437. "source_name": job.source_name,
  438. "printer_id": job.printer_id,
  439. "printer_name": job.printer_name,
  440. }
  441. for job in list(self._queued_jobs)
  442. ]
  443. active_jobs: list[dict[str, Any]] = []
  444. for active in self._active_jobs.values():
  445. upload_progress_pct = None
  446. if active.upload_total_bytes and active.upload_total_bytes > 0 and active.upload_bytes is not None:
  447. upload_progress_pct = round(
  448. max(0.0, min(100.0, (active.upload_bytes / active.upload_total_bytes) * 100.0)), 1
  449. )
  450. active_jobs.append(
  451. {
  452. "job_id": active.job.id,
  453. "kind": active.job.kind,
  454. "source_id": active.job.source_id,
  455. "source_name": active.job.source_name,
  456. "printer_id": active.job.printer_id,
  457. "printer_name": active.job.printer_name,
  458. "message": active.message,
  459. "upload_bytes": active.upload_bytes,
  460. "upload_total_bytes": active.upload_total_bytes,
  461. "upload_progress_pct": upload_progress_pct,
  462. }
  463. )
  464. active_jobs.sort(key=lambda item: int(item["job_id"]))
  465. active_job = active_jobs[0] if active_jobs else None
  466. return {
  467. "total": self._batch_total,
  468. "dispatched": dispatched,
  469. "processing": processing,
  470. "completed": self._batch_completed,
  471. "failed": self._batch_failed,
  472. "dispatched_jobs": dispatched_jobs,
  473. "active_jobs": active_jobs,
  474. "active_job": active_job,
  475. "recent_event": recent_event,
  476. }
  477. async def _process_job(self, job: PrintDispatchJob):
  478. if job.kind == "reprint_archive":
  479. await self._run_reprint_archive(job)
  480. return
  481. if job.kind == "print_library_file":
  482. await self._run_print_library_file(job)
  483. return
  484. raise RuntimeError(f"Unknown dispatch job kind: {job.kind}")
  485. async def _run_reprint_archive(self, job: PrintDispatchJob):
  486. from backend.app.main import register_expected_print
  487. async with async_session() as db:
  488. service = ArchiveService(db)
  489. archive = await service.get_archive(job.source_id)
  490. if not archive:
  491. raise RuntimeError("Archive not found")
  492. printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id))
  493. if not printer:
  494. raise RuntimeError("Printer not found")
  495. printer_name = printer.name
  496. printer_ip = printer.ip_address
  497. printer_access_code = printer.access_code
  498. printer_model = printer.model
  499. archive_filename = archive.filename
  500. if not printer_manager.is_connected(job.printer_id):
  501. raise RuntimeError("Printer is not connected")
  502. file_path = settings.base_dir / archive.file_path
  503. if not file_path.exists():
  504. raise RuntimeError("Archive file not found")
  505. remote_filename = derive_remote_filename(archive.filename)
  506. remote_path = f"/{remote_filename}"
  507. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  508. self._raise_if_cancel_requested(job)
  509. await self._set_active_message(job, f"Preparing upload to {printer_name}...")
  510. await delete_file_async(
  511. printer_ip,
  512. printer_access_code,
  513. remote_path,
  514. socket_timeout=ftp_timeout,
  515. printer_model=printer_model,
  516. )
  517. self._raise_if_cancel_requested(job)
  518. try:
  519. await self._set_active_message(job, f"Uploading {archive_filename} to {printer_name}...")
  520. loop = asyncio.get_running_loop()
  521. progress_state = {"last_emit": 0.0, "last_bytes": 0}
  522. def upload_progress_callback(uploaded: int, total: int):
  523. if self._is_cancel_requested(job.id):
  524. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  525. now = time.monotonic()
  526. should_emit = (
  527. uploaded >= total
  528. or now - progress_state["last_emit"] >= 0.2
  529. or uploaded - progress_state["last_bytes"] >= 256 * 1024
  530. )
  531. if should_emit:
  532. progress_state["last_emit"] = now
  533. progress_state["last_bytes"] = uploaded
  534. loop.call_soon_threadsafe(
  535. lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t))
  536. )
  537. if ftp_retry_enabled:
  538. uploaded = await with_ftp_retry(
  539. upload_file_async,
  540. printer_ip,
  541. printer_access_code,
  542. file_path,
  543. remote_path,
  544. progress_callback=upload_progress_callback,
  545. socket_timeout=ftp_timeout,
  546. printer_model=printer_model,
  547. max_retries=ftp_retry_count,
  548. retry_delay=ftp_retry_delay,
  549. operation_name=f"Upload for reprint to {printer_name}",
  550. non_retry_exceptions=(DispatchJobCancelled,),
  551. )
  552. else:
  553. uploaded = await upload_file_async(
  554. printer_ip,
  555. printer_access_code,
  556. file_path,
  557. remote_path,
  558. progress_callback=upload_progress_callback,
  559. socket_timeout=ftp_timeout,
  560. printer_model=printer_model,
  561. )
  562. if uploaded:
  563. await self._set_active_upload_progress(job, 1, 1)
  564. if not uploaded:
  565. raise RuntimeError(
  566. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)."
  567. )
  568. register_expected_print(
  569. job.printer_id,
  570. remote_filename,
  571. job.source_id,
  572. ams_mapping=job.options.get("ams_mapping"),
  573. )
  574. plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id"))
  575. self._raise_if_cancel_requested(job)
  576. await self._set_active_message(job, f"Starting print on {printer_name}...")
  577. started = printer_manager.start_print(
  578. job.printer_id,
  579. remote_filename,
  580. plate_id,
  581. ams_mapping=job.options.get("ams_mapping"),
  582. timelapse=job.options.get("timelapse", False),
  583. bed_levelling=job.options.get("bed_levelling", True),
  584. flow_cali=job.options.get("flow_cali", False),
  585. vibration_cali=job.options.get("vibration_cali", True),
  586. layer_inspect=job.options.get("layer_inspect", False),
  587. use_ams=job.options.get("use_ams", True),
  588. )
  589. if not started:
  590. await self._cleanup_sd_card_file(
  591. printer_ip,
  592. printer_access_code,
  593. remote_path,
  594. printer_model,
  595. )
  596. raise RuntimeError("Failed to start print")
  597. # Register the archive's local 3MF in the cover-cache so the
  598. # /cover endpoint can skip FTP — we already have the file on
  599. # disk, no need to refetch 36 MB from a printer whose FTP is
  600. # busy serving the active print (#1166 follow-up).
  601. cache_3mf_download(job.printer_id, remote_filename, file_path)
  602. # Wait for the printer to actually pick up the command before
  603. # marking the dispatch job complete (#1042). MQTT-publish success
  604. # only proves the command queued locally; the printer can still
  605. # reject it (HMS error pending, half-broken session, SD card
  606. # missing) and never transition. Until #1042 this watchdog was
  607. # fire-and-forget — the job was reported successful and the
  608. # user had no signal that the print never started. The uploaded
  609. # file is intentionally left on the printer's SD card on
  610. # timeout: the next dispatch will overwrite it via the existing
  611. # delete-then-upload step, and the printer may still be in the
  612. # middle of reading it if it picked up just past the timeout.
  613. pre_status = printer_manager.get_status(job.printer_id)
  614. pre_state = getattr(pre_status, "state", None) if pre_status else None
  615. pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
  616. pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None
  617. if pre_state:
  618. await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...")
  619. transitioned = await self._verify_print_response(
  620. job.printer_id,
  621. printer_name,
  622. pre_state,
  623. pre_subtask_id=pre_subtask_id,
  624. pre_gcode_file=pre_gcode_file,
  625. )
  626. if not transitioned:
  627. raise RuntimeError(
  628. f"Printer did not acknowledge print command — state still {pre_state}. "
  629. f"Check the printer for a pending error (HMS code, plate-clear prompt, "
  630. f"SD card) and try again."
  631. )
  632. if job.requested_by_user_id and job.requested_by_username:
  633. printer_manager.set_current_print_user(
  634. job.printer_id,
  635. job.requested_by_user_id,
  636. job.requested_by_username,
  637. )
  638. except DispatchJobCancelled:
  639. await self._set_active_message(job, f"Cancelled upload on {printer_name}.")
  640. raise
  641. async def _run_print_library_file(self, job: PrintDispatchJob):
  642. from backend.app.main import register_expected_print
  643. async with async_session() as db:
  644. lib_file = await db.scalar(LibraryFile.active().where(LibraryFile.id == job.source_id))
  645. if not lib_file:
  646. raise RuntimeError("File not found")
  647. if not self._is_sliced_file(lib_file.filename):
  648. raise RuntimeError("Not a sliced file. Only .gcode or .gcode.3mf files can be printed.")
  649. file_path = Path(settings.base_dir) / lib_file.file_path
  650. if not file_path.exists():
  651. raise RuntimeError("File not found on disk")
  652. printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id))
  653. if not printer:
  654. raise RuntimeError("Printer not found")
  655. printer_name = printer.name
  656. printer_ip = printer.ip_address
  657. printer_access_code = printer.access_code
  658. printer_model = printer.model
  659. library_filename = lib_file.filename
  660. if not printer_manager.is_connected(job.printer_id):
  661. raise RuntimeError("Printer is not connected")
  662. await self._set_active_message(job, f"Creating archive for {lib_file.filename}...")
  663. archive_service = ArchiveService(db)
  664. archive = await archive_service.archive_print(
  665. printer_id=job.printer_id,
  666. source_file=file_path,
  667. original_filename=lib_file.filename,
  668. project_id=job.project_id,
  669. created_by_id=job.requested_by_user_id,
  670. )
  671. if not archive:
  672. raise RuntimeError("Failed to create archive")
  673. await db.flush()
  674. remote_filename = derive_remote_filename(lib_file.filename)
  675. remote_path = f"/{remote_filename}"
  676. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  677. self._raise_if_cancel_requested(job)
  678. await self._set_active_message(job, f"Preparing upload to {printer_name}...")
  679. await delete_file_async(
  680. printer_ip,
  681. printer_access_code,
  682. remote_path,
  683. socket_timeout=ftp_timeout,
  684. printer_model=printer_model,
  685. )
  686. self._raise_if_cancel_requested(job)
  687. try:
  688. await self._set_active_message(job, f"Uploading {library_filename} to {printer_name}...")
  689. loop = asyncio.get_running_loop()
  690. progress_state = {"last_emit": 0.0, "last_bytes": 0}
  691. def upload_progress_callback(uploaded: int, total: int):
  692. if self._is_cancel_requested(job.id):
  693. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  694. now = time.monotonic()
  695. should_emit = (
  696. uploaded >= total
  697. or now - progress_state["last_emit"] >= 0.2
  698. or uploaded - progress_state["last_bytes"] >= 256 * 1024
  699. )
  700. if should_emit:
  701. progress_state["last_emit"] = now
  702. progress_state["last_bytes"] = uploaded
  703. loop.call_soon_threadsafe(
  704. lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t))
  705. )
  706. if ftp_retry_enabled:
  707. uploaded = await with_ftp_retry(
  708. upload_file_async,
  709. printer_ip,
  710. printer_access_code,
  711. file_path,
  712. remote_path,
  713. progress_callback=upload_progress_callback,
  714. socket_timeout=ftp_timeout,
  715. printer_model=printer_model,
  716. max_retries=ftp_retry_count,
  717. retry_delay=ftp_retry_delay,
  718. operation_name=f"Upload for print to {printer_name}",
  719. non_retry_exceptions=(DispatchJobCancelled,),
  720. )
  721. else:
  722. uploaded = await upload_file_async(
  723. printer_ip,
  724. printer_access_code,
  725. file_path,
  726. remote_path,
  727. progress_callback=upload_progress_callback,
  728. socket_timeout=ftp_timeout,
  729. printer_model=printer_model,
  730. )
  731. if uploaded:
  732. await self._set_active_upload_progress(job, 1, 1)
  733. if not uploaded:
  734. await db.rollback()
  735. raise RuntimeError(
  736. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)."
  737. )
  738. register_expected_print(
  739. job.printer_id,
  740. remote_filename,
  741. archive.id,
  742. ams_mapping=job.options.get("ams_mapping"),
  743. )
  744. plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id"))
  745. self._raise_if_cancel_requested(job)
  746. await self._set_active_message(job, f"Starting print on {printer_name}...")
  747. started = printer_manager.start_print(
  748. job.printer_id,
  749. remote_filename,
  750. plate_id,
  751. ams_mapping=job.options.get("ams_mapping"),
  752. timelapse=job.options.get("timelapse", False),
  753. bed_levelling=job.options.get("bed_levelling", True),
  754. flow_cali=job.options.get("flow_cali", False),
  755. vibration_cali=job.options.get("vibration_cali", True),
  756. layer_inspect=job.options.get("layer_inspect", False),
  757. use_ams=job.options.get("use_ams", True),
  758. )
  759. if not started:
  760. await self._cleanup_sd_card_file(
  761. printer_ip,
  762. printer_access_code,
  763. remote_path,
  764. printer_model,
  765. )
  766. await db.rollback()
  767. raise RuntimeError("Failed to start print")
  768. # Same as the archive path: register the library file's local
  769. # 3MF in the cover-cache so /cover skips FTP (#1166 follow-up).
  770. cache_3mf_download(job.printer_id, remote_filename, file_path)
  771. # See _run_reprint_archive for rationale (#1042). On timeout
  772. # also rolls back the freshly-created archive so the library
  773. # flow doesn't leave behind a phantom row for a print that
  774. # never started.
  775. pre_status = printer_manager.get_status(job.printer_id)
  776. pre_state = getattr(pre_status, "state", None) if pre_status else None
  777. pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
  778. pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None
  779. if pre_state:
  780. await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...")
  781. transitioned = await self._verify_print_response(
  782. job.printer_id,
  783. printer_name,
  784. pre_state,
  785. pre_subtask_id=pre_subtask_id,
  786. pre_gcode_file=pre_gcode_file,
  787. )
  788. if not transitioned:
  789. await db.rollback()
  790. raise RuntimeError(
  791. f"Printer did not acknowledge print command — state still {pre_state}. "
  792. f"Check the printer for a pending error (HMS code, plate-clear prompt, "
  793. f"SD card) and try again."
  794. )
  795. if job.requested_by_user_id and job.requested_by_username:
  796. printer_manager.set_current_print_user(
  797. job.printer_id,
  798. job.requested_by_user_id,
  799. job.requested_by_username,
  800. )
  801. # Direct-Print flow only: archive_print copies, so deleting the
  802. # transient library row + files here leaves archive intact. Disk
  803. # deletes run only after commit so a rollback leaves no orphan.
  804. cleanup_disk_paths: list[Path] = []
  805. if job.cleanup_library_after_dispatch and not lib_file.is_external:
  806. cleanup_disk_paths.append(file_path)
  807. if lib_file.thumbnail_path:
  808. thumb_path = Path(lib_file.thumbnail_path)
  809. if not thumb_path.is_absolute():
  810. thumb_path = Path(settings.base_dir) / lib_file.thumbnail_path
  811. cleanup_disk_paths.append(thumb_path)
  812. await db.delete(lib_file)
  813. await db.commit()
  814. for cleanup_path in cleanup_disk_paths:
  815. try:
  816. if cleanup_path.exists():
  817. cleanup_path.unlink()
  818. except OSError as cleanup_err:
  819. logger.warning("Failed to delete transient library file %s: %s", cleanup_path, cleanup_err)
  820. except DispatchJobCancelled:
  821. await db.rollback()
  822. await self._set_active_message(job, f"Cancelled upload on {printer_name}.")
  823. raise
  824. @staticmethod
  825. async def _verify_print_response(
  826. printer_id: int,
  827. printer_name: str,
  828. pre_state: str,
  829. pre_subtask_id: str | None = None,
  830. pre_gcode_file: str | None = None,
  831. timeout: float = 90.0,
  832. poll_interval: float = 3.0,
  833. ) -> bool:
  834. """Wait for the printer to acknowledge a print command.
  835. Returns True if the printer transitioned (state advanced past pre_state
  836. or subtask_id advanced past pre_subtask_id). Returns False on timeout —
  837. in that case logs a warning and forces an MQTT reconnect, mirroring the
  838. queue-side watchdog (`_watchdog_print_start`). Caller is responsible
  839. for surfacing the False result to the user (typically by raising so the
  840. dispatch job is marked failed).
  841. Both transition signals are checked because H2D can sit at FINISH for
  842. ~50 s after accepting `project_file` before flipping to PREPARE; the
  843. printer echoes our per-dispatch identity back as `subtask_id` on
  844. `push_status` first, so a subtask_id change is a definitive "command
  845. landed" signal even while state is still FINISH (#1078).
  846. """
  847. deadline = time.monotonic() + timeout
  848. last_status = None
  849. while time.monotonic() < deadline:
  850. await asyncio.sleep(poll_interval)
  851. state = printer_manager.get_status(printer_id)
  852. if not state:
  853. # Printer momentarily not reporting — could be a brief MQTT
  854. # disconnect mid-window. Keep polling rather than declaring
  855. # failure on the first missed tick; the printer may reconnect
  856. # within the remaining timeout and still surface a transition.
  857. continue
  858. last_status = state
  859. if state.state in _ACTIVE_PRINT_STATES:
  860. # Printer is actively processing the job. We do NOT accept
  861. # arbitrary state transitions: a printer going FINISH -> IDLE
  862. # (user dismissed the post-print prompt without accepting our
  863. # project_file) would otherwise look like "command landed"
  864. # and the dispatch job would be marked successful even though
  865. # no print is running (#1370).
  866. return True
  867. if pre_subtask_id is not None and state.subtask_id is not None and state.subtask_id != pre_subtask_id:
  868. # Printer picked up the job (subtask_id advanced). H2D can
  869. # sit at FINISH for ~50 s after accepting project_file before
  870. # transitioning to PREPARE, but the subtask_id flips to our
  871. # submission_id almost immediately (#1078).
  872. return True
  873. logger.warning(
  874. "Printer %s (%d) did not respond to print command within %.0fs "
  875. "(state still %s, subtask_id still %s) — printer may need restart",
  876. printer_name,
  877. printer_id,
  878. timeout,
  879. pre_state,
  880. pre_subtask_id,
  881. )
  882. # Distinguish #1150 (slow parse) from #887/#936 (half-broken session)
  883. # via gcode_file: if the printer is now showing a different file than
  884. # before dispatch, the project_file command landed and the printer is
  885. # parsing — a forced reconnect mid-parse causes 0500_4003. If
  886. # gcode_file is unchanged, the publish was silently swallowed and the
  887. # original #936 recovery (force_reconnect → fresh client_id) is what
  888. # we want. Caveat: in the rare retry-same-file-after-timeout case the
  889. # printer's gcode_file looks identical before and after the publish
  890. # lands, so a slow parse on retry-same-file still falls through to the
  891. # reconnect (and the original 0500_4003) — accepted to avoid breaking
  892. # the half-broken-session recovery path.
  893. client = printer_manager.get_client(printer_id)
  894. current_gcode_file = getattr(last_status, "gcode_file", None) if last_status else None
  895. publish_landed = current_gcode_file is not None and current_gcode_file != pre_gcode_file
  896. if publish_landed:
  897. logger.warning(
  898. "Printer %s (%d) gcode_file changed to %r (was %r) — printer "
  899. "received the command and is parsing slowly. Skipping forced "
  900. "MQTT reconnect to avoid 0500_4003 mid-parse (#1150).",
  901. printer_name,
  902. printer_id,
  903. current_gcode_file,
  904. pre_gcode_file,
  905. )
  906. elif client and hasattr(client, "force_reconnect_stale_session"):
  907. client.force_reconnect_stale_session(
  908. f"print command unacknowledged after {timeout:.0f}s "
  909. f"(state still {pre_state}, gcode_file {current_gcode_file!r})"
  910. )
  911. return False
  912. @staticmethod
  913. async def _cleanup_sd_card_file(
  914. printer_ip: str,
  915. access_code: str,
  916. remote_path: str,
  917. printer_model: str | None,
  918. ):
  919. """Best-effort delete of uploaded file from printer SD card."""
  920. try:
  921. await delete_file_async(printer_ip, access_code, remote_path, printer_model=printer_model)
  922. except Exception:
  923. pass # Best-effort — don't fail the error handler
  924. @staticmethod
  925. def _resolve_plate_id(file_path: Path, requested_plate_id: int | None) -> int:
  926. if requested_plate_id is not None:
  927. return requested_plate_id
  928. plate_id = 1
  929. try:
  930. with zipfile.ZipFile(file_path, "r") as zf:
  931. for name in zf.namelist():
  932. if name.startswith("Metadata/plate_") and name.endswith(".gcode"):
  933. plate_str = name[15:-6]
  934. plate_id = int(plate_str)
  935. break
  936. except (ValueError, zipfile.BadZipFile, OSError):
  937. pass
  938. return plate_id
  939. @staticmethod
  940. def _is_sliced_file(filename: str) -> bool:
  941. lower = filename.lower()
  942. return lower.endswith(".gcode") or lower.endswith(".gcode.3mf")
  943. background_dispatch = BackgroundDispatchService()