background_dispatch.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062
  1. """Background dispatch for print/reprint jobs.
  2. This service is separate from the app's print queue feature. It exists only to
  3. decouple "send/start print" operations (FTP upload + start command) from API
  4. request latency so the UI can continue immediately after dispatch.
  5. """
  6. from __future__ import annotations
  7. import asyncio
  8. import logging
  9. import time
  10. import zipfile
  11. from collections import deque
  12. from dataclasses import dataclass, field
  13. from pathlib import Path
  14. from typing import Any, Literal
  15. from sqlalchemy import select
  16. from backend.app.core.config import settings
  17. from backend.app.core.database import async_session
  18. from backend.app.core.websocket import ws_manager
  19. from backend.app.models.library import LibraryFile
  20. from backend.app.models.printer import Printer
  21. from backend.app.services.archive import ArchiveService
  22. from backend.app.services.bambu_ftp import (
  23. delete_file_async,
  24. get_ftp_retry_settings,
  25. upload_file_async,
  26. with_ftp_retry,
  27. )
  28. from backend.app.services.printer_manager import printer_manager
  29. logger = logging.getLogger(__name__)
  30. class DispatchJobCancelled(Exception):
  31. """Raised when a dispatch job is cancelled by the user."""
  32. class DispatchEnqueueRejected(Exception):
  33. """Raised when a dispatch job should not be accepted."""
  34. @dataclass(slots=True)
  35. class PrintDispatchJob:
  36. id: int
  37. kind: Literal["reprint_archive", "print_library_file"]
  38. source_id: int
  39. source_name: str
  40. printer_id: int
  41. printer_name: str
  42. options: dict[str, Any] = field(default_factory=dict)
  43. requested_by_user_id: int | None = None
  44. requested_by_username: str | None = None
  45. project_id: int | None = None
  46. cleanup_library_after_dispatch: bool = False
  47. @dataclass(slots=True)
  48. class ActiveDispatchState:
  49. job: PrintDispatchJob
  50. message: str
  51. upload_bytes: int | None = None
  52. upload_total_bytes: int | None = None
  53. class BackgroundDispatchService:
  54. def __init__(self):
  55. self._queued_jobs: deque[PrintDispatchJob] = deque()
  56. self._dispatcher_task: asyncio.Task | None = None
  57. self._running_tasks: dict[int, asyncio.Task] = {}
  58. self._lock = asyncio.Lock()
  59. self._job_event = asyncio.Event()
  60. self._next_job_id = 1
  61. self._active_jobs: dict[int, ActiveDispatchState] = {}
  62. self._cancel_requested_job_ids: set[int] = set()
  63. # Progress for the current "batch" (since queue became non-empty)
  64. self._batch_total = 0
  65. self._batch_completed = 0
  66. self._batch_failed = 0
  67. @staticmethod
  68. def _printer_is_busy_printing(printer_id: int) -> bool:
  69. state = printer_manager.get_status(printer_id)
  70. if not state:
  71. return False
  72. return state.state in ("RUNNING", "PAUSE", "PAUSED") and bool(state.gcode_file)
  73. async def start(self):
  74. async with self._lock:
  75. if self._dispatcher_task and not self._dispatcher_task.done():
  76. return
  77. self._dispatcher_task = asyncio.create_task(self._dispatcher_loop(), name="background-dispatch-dispatcher")
  78. logger.info("Background dispatch dispatcher started")
  79. async def stop(self):
  80. dispatcher: asyncio.Task | None = None
  81. running_tasks: list[asyncio.Task] = []
  82. async with self._lock:
  83. dispatcher = self._dispatcher_task
  84. self._dispatcher_task = None
  85. running_tasks = list(self._running_tasks.values())
  86. self._running_tasks.clear()
  87. self._active_jobs.clear()
  88. self._queued_jobs.clear()
  89. self._cancel_requested_job_ids.clear()
  90. self._job_event.set()
  91. if dispatcher:
  92. dispatcher.cancel()
  93. for task in running_tasks:
  94. task.cancel()
  95. if dispatcher:
  96. try:
  97. await dispatcher
  98. except asyncio.CancelledError:
  99. pass
  100. if running_tasks:
  101. await asyncio.gather(*running_tasks, return_exceptions=True)
  102. logger.info("Background dispatch dispatcher stopped")
  103. async def dispatch_reprint_archive(
  104. self,
  105. *,
  106. archive_id: int,
  107. archive_name: str,
  108. printer_id: int,
  109. printer_name: str,
  110. options: dict[str, Any],
  111. requested_by_user_id: int | None,
  112. requested_by_username: str | None,
  113. ) -> dict[str, Any]:
  114. return await self._dispatch(
  115. kind="reprint_archive",
  116. source_id=archive_id,
  117. source_name=archive_name,
  118. printer_id=printer_id,
  119. printer_name=printer_name,
  120. options=options,
  121. requested_by_user_id=requested_by_user_id,
  122. requested_by_username=requested_by_username,
  123. )
  124. async def get_state(self) -> dict[str, Any]:
  125. """Get current dispatch queue state snapshot for newly connected clients."""
  126. async with self._lock:
  127. return self._build_state_payload_unlocked()
  128. async def dispatch_print_library_file(
  129. self,
  130. *,
  131. file_id: int,
  132. filename: str,
  133. printer_id: int,
  134. printer_name: str,
  135. options: dict[str, Any],
  136. requested_by_user_id: int | None,
  137. requested_by_username: str | None,
  138. project_id: int | None = None,
  139. cleanup_library_after_dispatch: bool = False,
  140. ) -> dict[str, Any]:
  141. return await self._dispatch(
  142. kind="print_library_file",
  143. source_id=file_id,
  144. source_name=filename,
  145. printer_id=printer_id,
  146. printer_name=printer_name,
  147. options=options,
  148. requested_by_user_id=requested_by_user_id,
  149. requested_by_username=requested_by_username,
  150. project_id=project_id,
  151. cleanup_library_after_dispatch=cleanup_library_after_dispatch,
  152. )
  153. async def cancel_job(self, job_id: int) -> dict[str, Any]:
  154. """Cancel a queued dispatch job.
  155. Queued jobs are removed immediately. Active jobs are cancelled
  156. cooperatively and will stop at the next cancellation checkpoint.
  157. """
  158. async with self._lock:
  159. # Check active jobs first
  160. active_state = self._active_jobs.get(job_id)
  161. if active_state is not None:
  162. logger.info("Cancel requested for active dispatch job %s", job_id)
  163. self._cancel_requested_job_ids.add(job_id)
  164. active_job = active_state.job
  165. payload = self._build_state_payload_unlocked(
  166. recent_event={
  167. "status": "cancelling",
  168. "job_id": active_job.id,
  169. "source_name": active_job.source_name,
  170. "printer_id": active_job.printer_id,
  171. "printer_name": active_job.printer_name,
  172. "message": "Cancelling current dispatch...",
  173. }
  174. )
  175. result = {
  176. "cancelled": True,
  177. "pending": True,
  178. "job_id": active_job.id,
  179. "source_name": active_job.source_name,
  180. "printer_id": active_job.printer_id,
  181. "printer_name": active_job.printer_name,
  182. }
  183. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  184. return result
  185. # Check queued jobs
  186. cancelled_job: PrintDispatchJob | None = None
  187. for job in self._queued_jobs:
  188. if job.id == job_id:
  189. cancelled_job = job
  190. break
  191. if not cancelled_job:
  192. logger.info("Cancel requested for unknown dispatch job %s", job_id)
  193. return {"cancelled": False, "reason": "not_found"}
  194. self._queued_jobs.remove(cancelled_job)
  195. logger.info("Cancelled queued dispatch job %s", cancelled_job.id)
  196. self._batch_total = max(0, self._batch_total - 1)
  197. if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  198. self._batch_completed = 0
  199. self._batch_failed = 0
  200. payload = self._build_state_payload_unlocked(
  201. recent_event={
  202. "status": "cancelled",
  203. "job_id": cancelled_job.id,
  204. "source_name": cancelled_job.source_name,
  205. "printer_id": cancelled_job.printer_id,
  206. "printer_name": cancelled_job.printer_name,
  207. "message": "Cancelled from queue",
  208. }
  209. )
  210. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  211. return {
  212. "cancelled": True,
  213. "pending": False,
  214. "job_id": cancelled_job.id,
  215. "source_name": cancelled_job.source_name,
  216. "printer_id": cancelled_job.printer_id,
  217. "printer_name": cancelled_job.printer_name,
  218. }
  219. async def _dispatch(
  220. self,
  221. *,
  222. kind: Literal["reprint_archive", "print_library_file"],
  223. source_id: int,
  224. source_name: str,
  225. printer_id: int,
  226. printer_name: str,
  227. options: dict[str, Any],
  228. requested_by_user_id: int | None,
  229. requested_by_username: str | None,
  230. project_id: int | None = None,
  231. cleanup_library_after_dispatch: bool = False,
  232. ) -> dict[str, Any]:
  233. async with self._lock:
  234. has_pending_for_printer = any(job.printer_id == printer_id for job in self._queued_jobs)
  235. has_active_for_printer = any(active.job.printer_id == printer_id for active in self._active_jobs.values())
  236. if has_pending_for_printer or has_active_for_printer:
  237. raise DispatchEnqueueRejected(f"Printer {printer_name} already has a background dispatch in progress")
  238. if self._printer_is_busy_printing(printer_id):
  239. raise DispatchEnqueueRejected(f"Printer {printer_name} is currently busy printing")
  240. dispatch_position = len(self._queued_jobs) + len(self._active_jobs) + 1
  241. job = PrintDispatchJob(
  242. id=self._next_job_id,
  243. kind=kind,
  244. source_id=source_id,
  245. source_name=source_name,
  246. printer_id=printer_id,
  247. printer_name=printer_name,
  248. options=options,
  249. requested_by_user_id=requested_by_user_id,
  250. requested_by_username=requested_by_username,
  251. project_id=project_id,
  252. cleanup_library_after_dispatch=cleanup_library_after_dispatch,
  253. )
  254. self._next_job_id += 1
  255. self._batch_total += 1
  256. self._queued_jobs.append(job)
  257. self._job_event.set()
  258. payload = self._build_state_payload_unlocked(
  259. recent_event={
  260. "status": "dispatched",
  261. "job_id": job.id,
  262. "source_name": source_name,
  263. "printer_id": printer_id,
  264. "printer_name": printer_name,
  265. "message": f"Dispatched to {printer_name}",
  266. }
  267. )
  268. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  269. return {
  270. "dispatch_job_id": job.id,
  271. "dispatch_position": dispatch_position,
  272. "status": "dispatched",
  273. "printer_id": printer_id,
  274. "source_id": source_id,
  275. "source_name": source_name,
  276. }
  277. async def _dispatcher_loop(self):
  278. while True:
  279. await self._job_event.wait()
  280. self._job_event.clear()
  281. while True:
  282. payload: dict[str, Any] | None = None
  283. job_to_start: PrintDispatchJob | None = None
  284. async with self._lock:
  285. busy_printer_ids = {state.job.printer_id for state in self._active_jobs.values()}
  286. start_index = next(
  287. (
  288. idx
  289. for idx, queued_job in enumerate(self._queued_jobs)
  290. if queued_job.printer_id not in busy_printer_ids
  291. ),
  292. None,
  293. )
  294. if start_index is None:
  295. break
  296. job_to_start = self._queued_jobs[start_index]
  297. del self._queued_jobs[start_index]
  298. self._active_jobs[job_to_start.id] = ActiveDispatchState(
  299. job=job_to_start,
  300. message="Preparing background dispatch...",
  301. )
  302. task = asyncio.create_task(
  303. self._run_active_job(job_to_start), name=f"background-dispatch-job-{job_to_start.id}"
  304. )
  305. self._running_tasks[job_to_start.id] = task
  306. payload = self._build_state_payload_unlocked(
  307. recent_event={
  308. "status": "processing",
  309. "job_id": job_to_start.id,
  310. "source_name": job_to_start.source_name,
  311. "printer_id": job_to_start.printer_id,
  312. "printer_name": job_to_start.printer_name,
  313. "message": "Preparing background dispatch...",
  314. }
  315. )
  316. if payload:
  317. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  318. async def _run_active_job(self, job: PrintDispatchJob):
  319. try:
  320. await self._process_job(job)
  321. await self._mark_job_finished(job, failed=False, message="Background dispatch complete")
  322. except DispatchJobCancelled:
  323. await self._mark_job_cancelled(job)
  324. except asyncio.CancelledError:
  325. raise
  326. except Exception as e:
  327. logger.error("Background dispatch job %s failed: %s", job.id, e, exc_info=True)
  328. await self._mark_job_finished(job, failed=True, message=str(e))
  329. finally:
  330. self._job_event.set()
  331. async def _set_active_message(self, job: PrintDispatchJob, message: str):
  332. async with self._lock:
  333. active = self._active_jobs.get(job.id)
  334. if not active:
  335. return
  336. active.message = message
  337. payload = self._build_state_payload_unlocked(
  338. recent_event={
  339. "status": "processing",
  340. "job_id": active.job.id,
  341. "source_name": active.job.source_name,
  342. "printer_id": active.job.printer_id,
  343. "printer_name": active.job.printer_name,
  344. "message": message,
  345. }
  346. )
  347. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  348. async def _set_active_upload_progress(self, job: PrintDispatchJob, uploaded: int, total: int):
  349. async with self._lock:
  350. active = self._active_jobs.get(job.id)
  351. if not active:
  352. return
  353. active.upload_bytes = max(0, int(uploaded))
  354. active.upload_total_bytes = max(0, int(total))
  355. payload = self._build_state_payload_unlocked(
  356. recent_event={
  357. "status": "processing",
  358. "job_id": active.job.id,
  359. "source_name": active.job.source_name,
  360. "printer_id": active.job.printer_id,
  361. "printer_name": active.job.printer_name,
  362. "message": active.message,
  363. }
  364. )
  365. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  366. async def _mark_job_finished(self, job: PrintDispatchJob, *, failed: bool, message: str):
  367. async with self._lock:
  368. if failed:
  369. self._batch_failed += 1
  370. else:
  371. self._batch_completed += 1
  372. self._active_jobs.pop(job.id, None)
  373. self._running_tasks.pop(job.id, None)
  374. self._cancel_requested_job_ids.discard(job.id)
  375. payload = self._build_state_payload_unlocked(
  376. recent_event={
  377. "status": "failed" if failed else "completed",
  378. "job_id": job.id,
  379. "source_name": job.source_name,
  380. "printer_id": job.printer_id,
  381. "printer_name": job.printer_name,
  382. "message": message,
  383. }
  384. )
  385. should_reset_batch = len(self._queued_jobs) == 0 and len(self._active_jobs) == 0
  386. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  387. if should_reset_batch:
  388. async with self._lock:
  389. if len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  390. self._batch_total = 0
  391. self._batch_completed = 0
  392. self._batch_failed = 0
  393. async def _mark_job_cancelled(self, job: PrintDispatchJob):
  394. async with self._lock:
  395. self._active_jobs.pop(job.id, None)
  396. self._running_tasks.pop(job.id, None)
  397. self._cancel_requested_job_ids.discard(job.id)
  398. self._batch_total = max(0, self._batch_total - 1)
  399. if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  400. self._batch_completed = 0
  401. self._batch_failed = 0
  402. payload = self._build_state_payload_unlocked(
  403. recent_event={
  404. "status": "cancelled",
  405. "job_id": job.id,
  406. "source_name": job.source_name,
  407. "printer_id": job.printer_id,
  408. "printer_name": job.printer_name,
  409. "message": "Cancelled during dispatch",
  410. }
  411. )
  412. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  413. def _is_cancel_requested(self, job_id: int) -> bool:
  414. return job_id in self._cancel_requested_job_ids
  415. def _raise_if_cancel_requested(self, job: PrintDispatchJob):
  416. if self._is_cancel_requested(job.id):
  417. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled")
  418. def _build_state_payload_unlocked(self, recent_event: dict[str, Any] | None = None) -> dict[str, Any]:
  419. processing = len(self._active_jobs)
  420. dispatched = len(self._queued_jobs)
  421. dispatched_jobs = [
  422. {
  423. "job_id": job.id,
  424. "kind": job.kind,
  425. "source_id": job.source_id,
  426. "source_name": job.source_name,
  427. "printer_id": job.printer_id,
  428. "printer_name": job.printer_name,
  429. }
  430. for job in list(self._queued_jobs)
  431. ]
  432. active_jobs: list[dict[str, Any]] = []
  433. for active in self._active_jobs.values():
  434. upload_progress_pct = None
  435. if active.upload_total_bytes and active.upload_total_bytes > 0 and active.upload_bytes is not None:
  436. upload_progress_pct = round(
  437. max(0.0, min(100.0, (active.upload_bytes / active.upload_total_bytes) * 100.0)), 1
  438. )
  439. active_jobs.append(
  440. {
  441. "job_id": active.job.id,
  442. "kind": active.job.kind,
  443. "source_id": active.job.source_id,
  444. "source_name": active.job.source_name,
  445. "printer_id": active.job.printer_id,
  446. "printer_name": active.job.printer_name,
  447. "message": active.message,
  448. "upload_bytes": active.upload_bytes,
  449. "upload_total_bytes": active.upload_total_bytes,
  450. "upload_progress_pct": upload_progress_pct,
  451. }
  452. )
  453. active_jobs.sort(key=lambda item: int(item["job_id"]))
  454. active_job = active_jobs[0] if active_jobs else None
  455. return {
  456. "total": self._batch_total,
  457. "dispatched": dispatched,
  458. "processing": processing,
  459. "completed": self._batch_completed,
  460. "failed": self._batch_failed,
  461. "dispatched_jobs": dispatched_jobs,
  462. "active_jobs": active_jobs,
  463. "active_job": active_job,
  464. "recent_event": recent_event,
  465. }
  466. async def _process_job(self, job: PrintDispatchJob):
  467. if job.kind == "reprint_archive":
  468. await self._run_reprint_archive(job)
  469. return
  470. if job.kind == "print_library_file":
  471. await self._run_print_library_file(job)
  472. return
  473. raise RuntimeError(f"Unknown dispatch job kind: {job.kind}")
  474. async def _run_reprint_archive(self, job: PrintDispatchJob):
  475. from backend.app.main import register_expected_print
  476. async with async_session() as db:
  477. service = ArchiveService(db)
  478. archive = await service.get_archive(job.source_id)
  479. if not archive:
  480. raise RuntimeError("Archive not found")
  481. printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id))
  482. if not printer:
  483. raise RuntimeError("Printer not found")
  484. printer_name = printer.name
  485. printer_ip = printer.ip_address
  486. printer_access_code = printer.access_code
  487. printer_model = printer.model
  488. archive_filename = archive.filename
  489. if not printer_manager.is_connected(job.printer_id):
  490. raise RuntimeError("Printer is not connected")
  491. file_path = settings.base_dir / archive.file_path
  492. if not file_path.exists():
  493. raise RuntimeError("Archive file not found")
  494. base_name = archive.filename
  495. if base_name.endswith(".gcode.3mf"):
  496. base_name = base_name[:-10]
  497. elif base_name.endswith(".3mf"):
  498. base_name = base_name[:-4]
  499. remote_filename = f"{base_name}.3mf"
  500. # Sanitize: firmware parses ftp://{filename} as a URL, spaces break it
  501. remote_filename = remote_filename.replace(" ", "_")
  502. remote_path = f"/{remote_filename}"
  503. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  504. self._raise_if_cancel_requested(job)
  505. await self._set_active_message(job, f"Preparing upload to {printer_name}...")
  506. await delete_file_async(
  507. printer_ip,
  508. printer_access_code,
  509. remote_path,
  510. socket_timeout=ftp_timeout,
  511. printer_model=printer_model,
  512. )
  513. self._raise_if_cancel_requested(job)
  514. try:
  515. await self._set_active_message(job, f"Uploading {archive_filename} to {printer_name}...")
  516. loop = asyncio.get_running_loop()
  517. progress_state = {"last_emit": 0.0, "last_bytes": 0}
  518. def upload_progress_callback(uploaded: int, total: int):
  519. if self._is_cancel_requested(job.id):
  520. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  521. now = time.monotonic()
  522. should_emit = (
  523. uploaded >= total
  524. or now - progress_state["last_emit"] >= 0.2
  525. or uploaded - progress_state["last_bytes"] >= 256 * 1024
  526. )
  527. if should_emit:
  528. progress_state["last_emit"] = now
  529. progress_state["last_bytes"] = uploaded
  530. loop.call_soon_threadsafe(
  531. lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t))
  532. )
  533. if ftp_retry_enabled:
  534. uploaded = await with_ftp_retry(
  535. upload_file_async,
  536. printer_ip,
  537. printer_access_code,
  538. file_path,
  539. remote_path,
  540. progress_callback=upload_progress_callback,
  541. socket_timeout=ftp_timeout,
  542. printer_model=printer_model,
  543. max_retries=ftp_retry_count,
  544. retry_delay=ftp_retry_delay,
  545. operation_name=f"Upload for reprint to {printer_name}",
  546. non_retry_exceptions=(DispatchJobCancelled,),
  547. )
  548. else:
  549. uploaded = await upload_file_async(
  550. printer_ip,
  551. printer_access_code,
  552. file_path,
  553. remote_path,
  554. progress_callback=upload_progress_callback,
  555. socket_timeout=ftp_timeout,
  556. printer_model=printer_model,
  557. )
  558. if uploaded:
  559. await self._set_active_upload_progress(job, 1, 1)
  560. if not uploaded:
  561. raise RuntimeError(
  562. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)."
  563. )
  564. register_expected_print(
  565. job.printer_id,
  566. remote_filename,
  567. job.source_id,
  568. ams_mapping=job.options.get("ams_mapping"),
  569. )
  570. plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id"))
  571. self._raise_if_cancel_requested(job)
  572. await self._set_active_message(job, f"Starting print on {printer_name}...")
  573. started = printer_manager.start_print(
  574. job.printer_id,
  575. remote_filename,
  576. plate_id,
  577. ams_mapping=job.options.get("ams_mapping"),
  578. timelapse=job.options.get("timelapse", False),
  579. bed_levelling=job.options.get("bed_levelling", True),
  580. flow_cali=job.options.get("flow_cali", False),
  581. vibration_cali=job.options.get("vibration_cali", False),
  582. layer_inspect=job.options.get("layer_inspect", False),
  583. use_ams=job.options.get("use_ams", True),
  584. )
  585. if not started:
  586. await self._cleanup_sd_card_file(
  587. printer_ip,
  588. printer_access_code,
  589. remote_path,
  590. printer_model,
  591. )
  592. raise RuntimeError("Failed to start print")
  593. # Wait for the printer to actually pick up the command before
  594. # marking the dispatch job complete (#1042). MQTT-publish success
  595. # only proves the command queued locally; the printer can still
  596. # reject it (HMS error pending, half-broken session, SD card
  597. # missing) and never transition. Until #1042 this watchdog was
  598. # fire-and-forget — the job was reported successful and the
  599. # user had no signal that the print never started. The uploaded
  600. # file is intentionally left on the printer's SD card on
  601. # timeout: the next dispatch will overwrite it via the existing
  602. # delete-then-upload step, and the printer may still be in the
  603. # middle of reading it if it picked up just past the timeout.
  604. pre_status = printer_manager.get_status(job.printer_id)
  605. pre_state = getattr(pre_status, "state", None) if pre_status else None
  606. pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
  607. pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None
  608. if pre_state:
  609. await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...")
  610. transitioned = await self._verify_print_response(
  611. job.printer_id,
  612. printer_name,
  613. pre_state,
  614. pre_subtask_id=pre_subtask_id,
  615. pre_gcode_file=pre_gcode_file,
  616. )
  617. if not transitioned:
  618. raise RuntimeError(
  619. f"Printer did not acknowledge print command — state still {pre_state}. "
  620. f"Check the printer for a pending error (HMS code, plate-clear prompt, "
  621. f"SD card) and try again."
  622. )
  623. if job.requested_by_user_id and job.requested_by_username:
  624. printer_manager.set_current_print_user(
  625. job.printer_id,
  626. job.requested_by_user_id,
  627. job.requested_by_username,
  628. )
  629. except DispatchJobCancelled:
  630. await self._set_active_message(job, f"Cancelled upload on {printer_name}.")
  631. raise
  632. async def _run_print_library_file(self, job: PrintDispatchJob):
  633. from backend.app.main import register_expected_print
  634. async with async_session() as db:
  635. lib_file = await db.scalar(LibraryFile.active().where(LibraryFile.id == job.source_id))
  636. if not lib_file:
  637. raise RuntimeError("File not found")
  638. if not self._is_sliced_file(lib_file.filename):
  639. raise RuntimeError("Not a sliced file. Only .gcode or .gcode.3mf files can be printed.")
  640. file_path = Path(settings.base_dir) / lib_file.file_path
  641. if not file_path.exists():
  642. raise RuntimeError("File not found on disk")
  643. printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id))
  644. if not printer:
  645. raise RuntimeError("Printer not found")
  646. printer_name = printer.name
  647. printer_ip = printer.ip_address
  648. printer_access_code = printer.access_code
  649. printer_model = printer.model
  650. library_filename = lib_file.filename
  651. if not printer_manager.is_connected(job.printer_id):
  652. raise RuntimeError("Printer is not connected")
  653. await self._set_active_message(job, f"Creating archive for {lib_file.filename}...")
  654. archive_service = ArchiveService(db)
  655. archive = await archive_service.archive_print(
  656. printer_id=job.printer_id,
  657. source_file=file_path,
  658. original_filename=lib_file.filename,
  659. project_id=job.project_id,
  660. created_by_id=job.requested_by_user_id,
  661. )
  662. if not archive:
  663. raise RuntimeError("Failed to create archive")
  664. await db.flush()
  665. base_name = lib_file.filename
  666. if base_name.endswith(".gcode.3mf"):
  667. base_name = base_name[:-10]
  668. elif base_name.endswith(".3mf"):
  669. base_name = base_name[:-4]
  670. remote_filename = f"{base_name}.3mf"
  671. # Sanitize: firmware parses ftp://{filename} as a URL, spaces break it
  672. remote_filename = remote_filename.replace(" ", "_")
  673. remote_path = f"/{remote_filename}"
  674. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  675. self._raise_if_cancel_requested(job)
  676. await self._set_active_message(job, f"Preparing upload to {printer_name}...")
  677. await delete_file_async(
  678. printer_ip,
  679. printer_access_code,
  680. remote_path,
  681. socket_timeout=ftp_timeout,
  682. printer_model=printer_model,
  683. )
  684. self._raise_if_cancel_requested(job)
  685. try:
  686. await self._set_active_message(job, f"Uploading {library_filename} to {printer_name}...")
  687. loop = asyncio.get_running_loop()
  688. progress_state = {"last_emit": 0.0, "last_bytes": 0}
  689. def upload_progress_callback(uploaded: int, total: int):
  690. if self._is_cancel_requested(job.id):
  691. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  692. now = time.monotonic()
  693. should_emit = (
  694. uploaded >= total
  695. or now - progress_state["last_emit"] >= 0.2
  696. or uploaded - progress_state["last_bytes"] >= 256 * 1024
  697. )
  698. if should_emit:
  699. progress_state["last_emit"] = now
  700. progress_state["last_bytes"] = uploaded
  701. loop.call_soon_threadsafe(
  702. lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t))
  703. )
  704. if ftp_retry_enabled:
  705. uploaded = await with_ftp_retry(
  706. upload_file_async,
  707. printer_ip,
  708. printer_access_code,
  709. file_path,
  710. remote_path,
  711. progress_callback=upload_progress_callback,
  712. socket_timeout=ftp_timeout,
  713. printer_model=printer_model,
  714. max_retries=ftp_retry_count,
  715. retry_delay=ftp_retry_delay,
  716. operation_name=f"Upload for print to {printer_name}",
  717. non_retry_exceptions=(DispatchJobCancelled,),
  718. )
  719. else:
  720. uploaded = await upload_file_async(
  721. printer_ip,
  722. printer_access_code,
  723. file_path,
  724. remote_path,
  725. progress_callback=upload_progress_callback,
  726. socket_timeout=ftp_timeout,
  727. printer_model=printer_model,
  728. )
  729. if uploaded:
  730. await self._set_active_upload_progress(job, 1, 1)
  731. if not uploaded:
  732. await db.rollback()
  733. raise RuntimeError(
  734. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)."
  735. )
  736. register_expected_print(
  737. job.printer_id,
  738. remote_filename,
  739. archive.id,
  740. ams_mapping=job.options.get("ams_mapping"),
  741. )
  742. plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id"))
  743. self._raise_if_cancel_requested(job)
  744. await self._set_active_message(job, f"Starting print on {printer_name}...")
  745. started = printer_manager.start_print(
  746. job.printer_id,
  747. remote_filename,
  748. plate_id,
  749. ams_mapping=job.options.get("ams_mapping"),
  750. timelapse=job.options.get("timelapse", False),
  751. bed_levelling=job.options.get("bed_levelling", True),
  752. flow_cali=job.options.get("flow_cali", False),
  753. vibration_cali=job.options.get("vibration_cali", False),
  754. layer_inspect=job.options.get("layer_inspect", False),
  755. use_ams=job.options.get("use_ams", True),
  756. )
  757. if not started:
  758. await self._cleanup_sd_card_file(
  759. printer_ip,
  760. printer_access_code,
  761. remote_path,
  762. printer_model,
  763. )
  764. await db.rollback()
  765. raise RuntimeError("Failed to start print")
  766. # See _run_reprint_archive for rationale (#1042). On timeout
  767. # also rolls back the freshly-created archive so the library
  768. # flow doesn't leave behind a phantom row for a print that
  769. # never started.
  770. pre_status = printer_manager.get_status(job.printer_id)
  771. pre_state = getattr(pre_status, "state", None) if pre_status else None
  772. pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
  773. pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None
  774. if pre_state:
  775. await self._set_active_message(job, f"Waiting for {printer_name} to acknowledge print...")
  776. transitioned = await self._verify_print_response(
  777. job.printer_id,
  778. printer_name,
  779. pre_state,
  780. pre_subtask_id=pre_subtask_id,
  781. pre_gcode_file=pre_gcode_file,
  782. )
  783. if not transitioned:
  784. await db.rollback()
  785. raise RuntimeError(
  786. f"Printer did not acknowledge print command — state still {pre_state}. "
  787. f"Check the printer for a pending error (HMS code, plate-clear prompt, "
  788. f"SD card) and try again."
  789. )
  790. if job.requested_by_user_id and job.requested_by_username:
  791. printer_manager.set_current_print_user(
  792. job.printer_id,
  793. job.requested_by_user_id,
  794. job.requested_by_username,
  795. )
  796. # Direct-Print flow only: archive_print copies, so deleting the
  797. # transient library row + files here leaves archive intact. Disk
  798. # deletes run only after commit so a rollback leaves no orphan.
  799. cleanup_disk_paths: list[Path] = []
  800. if job.cleanup_library_after_dispatch and not lib_file.is_external:
  801. cleanup_disk_paths.append(file_path)
  802. if lib_file.thumbnail_path:
  803. thumb_path = Path(lib_file.thumbnail_path)
  804. if not thumb_path.is_absolute():
  805. thumb_path = Path(settings.base_dir) / lib_file.thumbnail_path
  806. cleanup_disk_paths.append(thumb_path)
  807. await db.delete(lib_file)
  808. await db.commit()
  809. for cleanup_path in cleanup_disk_paths:
  810. try:
  811. if cleanup_path.exists():
  812. cleanup_path.unlink()
  813. except OSError as cleanup_err:
  814. logger.warning("Failed to delete transient library file %s: %s", cleanup_path, cleanup_err)
  815. except DispatchJobCancelled:
  816. await db.rollback()
  817. await self._set_active_message(job, f"Cancelled upload on {printer_name}.")
  818. raise
  819. @staticmethod
  820. async def _verify_print_response(
  821. printer_id: int,
  822. printer_name: str,
  823. pre_state: str,
  824. pre_subtask_id: str | None = None,
  825. pre_gcode_file: str | None = None,
  826. timeout: float = 90.0,
  827. poll_interval: float = 3.0,
  828. ) -> bool:
  829. """Wait for the printer to acknowledge a print command.
  830. Returns True if the printer transitioned (state advanced past pre_state
  831. or subtask_id advanced past pre_subtask_id). Returns False on timeout —
  832. in that case logs a warning and forces an MQTT reconnect, mirroring the
  833. queue-side watchdog (`_watchdog_print_start`). Caller is responsible
  834. for surfacing the False result to the user (typically by raising so the
  835. dispatch job is marked failed).
  836. Both transition signals are checked because H2D can sit at FINISH for
  837. ~50 s after accepting `project_file` before flipping to PREPARE; the
  838. printer echoes our per-dispatch identity back as `subtask_id` on
  839. `push_status` first, so a subtask_id change is a definitive "command
  840. landed" signal even while state is still FINISH (#1078).
  841. """
  842. deadline = time.monotonic() + timeout
  843. last_status = None
  844. while time.monotonic() < deadline:
  845. await asyncio.sleep(poll_interval)
  846. state = printer_manager.get_status(printer_id)
  847. if not state:
  848. # Printer momentarily not reporting — could be a brief MQTT
  849. # disconnect mid-window. Keep polling rather than declaring
  850. # failure on the first missed tick; the printer may reconnect
  851. # within the remaining timeout and still surface a transition.
  852. continue
  853. last_status = state
  854. if state.state != pre_state:
  855. return True
  856. if pre_subtask_id is not None and state.subtask_id is not None and state.subtask_id != pre_subtask_id:
  857. return True
  858. logger.warning(
  859. "Printer %s (%d) did not respond to print command within %.0fs "
  860. "(state still %s, subtask_id still %s) — printer may need restart",
  861. printer_name,
  862. printer_id,
  863. timeout,
  864. pre_state,
  865. pre_subtask_id,
  866. )
  867. # Distinguish #1150 (slow parse) from #887/#936 (half-broken session)
  868. # via gcode_file: if the printer is now showing a different file than
  869. # before dispatch, the project_file command landed and the printer is
  870. # parsing — a forced reconnect mid-parse causes 0500_4003. If
  871. # gcode_file is unchanged, the publish was silently swallowed and the
  872. # original #936 recovery (force_reconnect → fresh client_id) is what
  873. # we want. Caveat: in the rare retry-same-file-after-timeout case the
  874. # printer's gcode_file looks identical before and after the publish
  875. # lands, so a slow parse on retry-same-file still falls through to the
  876. # reconnect (and the original 0500_4003) — accepted to avoid breaking
  877. # the half-broken-session recovery path.
  878. client = printer_manager.get_client(printer_id)
  879. current_gcode_file = getattr(last_status, "gcode_file", None) if last_status else None
  880. publish_landed = current_gcode_file is not None and current_gcode_file != pre_gcode_file
  881. if publish_landed:
  882. logger.warning(
  883. "Printer %s (%d) gcode_file changed to %r (was %r) — printer "
  884. "received the command and is parsing slowly. Skipping forced "
  885. "MQTT reconnect to avoid 0500_4003 mid-parse (#1150).",
  886. printer_name,
  887. printer_id,
  888. current_gcode_file,
  889. pre_gcode_file,
  890. )
  891. elif client and hasattr(client, "force_reconnect_stale_session"):
  892. client.force_reconnect_stale_session(
  893. f"print command unacknowledged after {timeout:.0f}s "
  894. f"(state still {pre_state}, gcode_file {current_gcode_file!r})"
  895. )
  896. return False
  897. @staticmethod
  898. async def _cleanup_sd_card_file(
  899. printer_ip: str,
  900. access_code: str,
  901. remote_path: str,
  902. printer_model: str | None,
  903. ):
  904. """Best-effort delete of uploaded file from printer SD card."""
  905. try:
  906. await delete_file_async(printer_ip, access_code, remote_path, printer_model=printer_model)
  907. except Exception:
  908. pass # Best-effort — don't fail the error handler
  909. @staticmethod
  910. def _resolve_plate_id(file_path: Path, requested_plate_id: int | None) -> int:
  911. if requested_plate_id is not None:
  912. return requested_plate_id
  913. plate_id = 1
  914. try:
  915. with zipfile.ZipFile(file_path, "r") as zf:
  916. for name in zf.namelist():
  917. if name.startswith("Metadata/plate_") and name.endswith(".gcode"):
  918. plate_str = name[15:-6]
  919. plate_id = int(plate_str)
  920. break
  921. except (ValueError, zipfile.BadZipFile, OSError):
  922. pass
  923. return plate_id
  924. @staticmethod
  925. def _is_sliced_file(filename: str) -> bool:
  926. lower = filename.lower()
  927. return lower.endswith(".gcode") or lower.endswith(".gcode.3mf")
  928. background_dispatch = BackgroundDispatchService()