background_dispatch.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857
  1. """Background dispatch for print/reprint jobs.
  2. This service is separate from the app's print queue feature. It exists only to
  3. decouple "send/start print" operations (FTP upload + start command) from API
  4. request latency so the UI can continue immediately after dispatch.
  5. """
  6. from __future__ import annotations
  7. import asyncio
  8. import logging
  9. import time
  10. import zipfile
  11. from collections import deque
  12. from dataclasses import dataclass, field
  13. from pathlib import Path
  14. from typing import Any, Literal
  15. from sqlalchemy import select
  16. from backend.app.core.config import settings
  17. from backend.app.core.database import async_session
  18. from backend.app.core.websocket import ws_manager
  19. from backend.app.models.library import LibraryFile
  20. from backend.app.models.printer import Printer
  21. from backend.app.services.archive import ArchiveService
  22. from backend.app.services.bambu_ftp import (
  23. delete_file_async,
  24. get_ftp_retry_settings,
  25. upload_file_async,
  26. with_ftp_retry,
  27. )
  28. from backend.app.services.printer_manager import printer_manager
  29. logger = logging.getLogger(__name__)
  30. class DispatchJobCancelled(Exception):
  31. """Raised when a dispatch job is cancelled by the user."""
  32. class DispatchEnqueueRejected(Exception):
  33. """Raised when a dispatch job should not be accepted."""
  34. @dataclass(slots=True)
  35. class PrintDispatchJob:
  36. id: int
  37. kind: Literal["reprint_archive", "print_library_file"]
  38. source_id: int
  39. source_name: str
  40. printer_id: int
  41. printer_name: str
  42. options: dict[str, Any] = field(default_factory=dict)
  43. requested_by_user_id: int | None = None
  44. requested_by_username: str | None = None
  45. @dataclass(slots=True)
  46. class ActiveDispatchState:
  47. job: PrintDispatchJob
  48. message: str
  49. upload_bytes: int | None = None
  50. upload_total_bytes: int | None = None
  51. class BackgroundDispatchService:
  52. def __init__(self):
  53. self._queued_jobs: deque[PrintDispatchJob] = deque()
  54. self._dispatcher_task: asyncio.Task | None = None
  55. self._running_tasks: dict[int, asyncio.Task] = {}
  56. self._lock = asyncio.Lock()
  57. self._job_event = asyncio.Event()
  58. self._next_job_id = 1
  59. self._active_jobs: dict[int, ActiveDispatchState] = {}
  60. self._cancel_requested_job_ids: set[int] = set()
  61. # Progress for the current "batch" (since queue became non-empty)
  62. self._batch_total = 0
  63. self._batch_completed = 0
  64. self._batch_failed = 0
  65. @staticmethod
  66. def _printer_is_busy_printing(printer_id: int) -> bool:
  67. state = printer_manager.get_status(printer_id)
  68. if not state:
  69. return False
  70. return state.state in ("RUNNING", "PAUSE", "PAUSED") and bool(state.gcode_file)
  71. async def start(self):
  72. async with self._lock:
  73. if self._dispatcher_task and not self._dispatcher_task.done():
  74. return
  75. self._dispatcher_task = asyncio.create_task(self._dispatcher_loop(), name="background-dispatch-dispatcher")
  76. logger.info("Background dispatch dispatcher started")
  77. async def stop(self):
  78. dispatcher: asyncio.Task | None = None
  79. running_tasks: list[asyncio.Task] = []
  80. async with self._lock:
  81. dispatcher = self._dispatcher_task
  82. self._dispatcher_task = None
  83. running_tasks = list(self._running_tasks.values())
  84. self._running_tasks.clear()
  85. self._active_jobs.clear()
  86. self._queued_jobs.clear()
  87. self._cancel_requested_job_ids.clear()
  88. self._job_event.set()
  89. if dispatcher:
  90. dispatcher.cancel()
  91. for task in running_tasks:
  92. task.cancel()
  93. if dispatcher:
  94. try:
  95. await dispatcher
  96. except asyncio.CancelledError:
  97. pass
  98. if running_tasks:
  99. await asyncio.gather(*running_tasks, return_exceptions=True)
  100. logger.info("Background dispatch dispatcher stopped")
  101. async def dispatch_reprint_archive(
  102. self,
  103. *,
  104. archive_id: int,
  105. archive_name: str,
  106. printer_id: int,
  107. printer_name: str,
  108. options: dict[str, Any],
  109. requested_by_user_id: int | None,
  110. requested_by_username: str | None,
  111. ) -> dict[str, Any]:
  112. return await self._dispatch(
  113. kind="reprint_archive",
  114. source_id=archive_id,
  115. source_name=archive_name,
  116. printer_id=printer_id,
  117. printer_name=printer_name,
  118. options=options,
  119. requested_by_user_id=requested_by_user_id,
  120. requested_by_username=requested_by_username,
  121. )
  122. async def get_state(self) -> dict[str, Any]:
  123. """Get current dispatch queue state snapshot for newly connected clients."""
  124. async with self._lock:
  125. return self._build_state_payload_unlocked()
  126. async def dispatch_print_library_file(
  127. self,
  128. *,
  129. file_id: int,
  130. filename: str,
  131. printer_id: int,
  132. printer_name: str,
  133. options: dict[str, Any],
  134. requested_by_user_id: int | None,
  135. requested_by_username: str | None,
  136. ) -> dict[str, Any]:
  137. return await self._dispatch(
  138. kind="print_library_file",
  139. source_id=file_id,
  140. source_name=filename,
  141. printer_id=printer_id,
  142. printer_name=printer_name,
  143. options=options,
  144. requested_by_user_id=requested_by_user_id,
  145. requested_by_username=requested_by_username,
  146. )
  147. async def cancel_job(self, job_id: int) -> dict[str, Any]:
  148. """Cancel a queued dispatch job.
  149. Queued jobs are removed immediately. Active jobs are cancelled
  150. cooperatively and will stop at the next cancellation checkpoint.
  151. """
  152. active_cancel_payload: dict[str, Any] | None = None
  153. active_cancel_result: dict[str, Any] | None = None
  154. async with self._lock:
  155. active_state = self._active_jobs.get(job_id)
  156. if active_state is not None:
  157. logger.info("Cancel requested for active dispatch job %s", job_id)
  158. self._cancel_requested_job_ids.add(job_id)
  159. active_job = active_state.job
  160. active_cancel_payload = self._build_state_payload_unlocked(
  161. recent_event={
  162. "status": "cancelling",
  163. "job_id": active_job.id,
  164. "source_name": active_job.source_name,
  165. "printer_id": active_job.printer_id,
  166. "printer_name": active_job.printer_name,
  167. "message": "Cancelling current dispatch...",
  168. }
  169. )
  170. active_cancel_result = {
  171. "cancelled": True,
  172. "pending": True,
  173. "job_id": active_job.id,
  174. "source_name": active_job.source_name,
  175. "printer_id": active_job.printer_id,
  176. "printer_name": active_job.printer_name,
  177. }
  178. if active_cancel_payload and active_cancel_result:
  179. await ws_manager.broadcast({"type": "background_dispatch", "data": active_cancel_payload})
  180. return active_cancel_result
  181. async with self._lock:
  182. cancelled_job: PrintDispatchJob | None = None
  183. for job in self._queued_jobs:
  184. if job.id == job_id:
  185. cancelled_job = job
  186. break
  187. if not cancelled_job:
  188. logger.info("Cancel requested for unknown dispatch job %s", job_id)
  189. return {"cancelled": False, "reason": "not_found"}
  190. self._queued_jobs.remove(cancelled_job)
  191. logger.info("Cancelled queued dispatch job %s", cancelled_job.id)
  192. self._batch_total = max(0, self._batch_total - 1)
  193. if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  194. self._batch_completed = 0
  195. self._batch_failed = 0
  196. payload = self._build_state_payload_unlocked(
  197. recent_event={
  198. "status": "cancelled",
  199. "job_id": cancelled_job.id,
  200. "source_name": cancelled_job.source_name,
  201. "printer_id": cancelled_job.printer_id,
  202. "printer_name": cancelled_job.printer_name,
  203. "message": "Cancelled from queue",
  204. }
  205. )
  206. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  207. return {
  208. "cancelled": True,
  209. "pending": False,
  210. "job_id": cancelled_job.id,
  211. "source_name": cancelled_job.source_name,
  212. "printer_id": cancelled_job.printer_id,
  213. "printer_name": cancelled_job.printer_name,
  214. }
  215. async def _dispatch(
  216. self,
  217. *,
  218. kind: Literal["reprint_archive", "print_library_file"],
  219. source_id: int,
  220. source_name: str,
  221. printer_id: int,
  222. printer_name: str,
  223. options: dict[str, Any],
  224. requested_by_user_id: int | None,
  225. requested_by_username: str | None,
  226. ) -> dict[str, Any]:
  227. async with self._lock:
  228. has_pending_for_printer = any(job.printer_id == printer_id for job in self._queued_jobs)
  229. has_active_for_printer = any(active.job.printer_id == printer_id for active in self._active_jobs.values())
  230. if has_pending_for_printer or has_active_for_printer:
  231. raise DispatchEnqueueRejected(f"Printer {printer_name} already has a background dispatch in progress")
  232. if self._printer_is_busy_printing(printer_id):
  233. raise DispatchEnqueueRejected(f"Printer {printer_name} is currently busy printing")
  234. dispatch_position = len(self._queued_jobs) + len(self._active_jobs) + 1
  235. job = PrintDispatchJob(
  236. id=self._next_job_id,
  237. kind=kind,
  238. source_id=source_id,
  239. source_name=source_name,
  240. printer_id=printer_id,
  241. printer_name=printer_name,
  242. options=options,
  243. requested_by_user_id=requested_by_user_id,
  244. requested_by_username=requested_by_username,
  245. )
  246. self._next_job_id += 1
  247. self._batch_total += 1
  248. self._queued_jobs.append(job)
  249. self._job_event.set()
  250. payload = self._build_state_payload_unlocked(
  251. recent_event={
  252. "status": "dispatched",
  253. "job_id": job.id,
  254. "source_name": source_name,
  255. "printer_id": printer_id,
  256. "printer_name": printer_name,
  257. "message": f"Dispatched to {printer_name}",
  258. }
  259. )
  260. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  261. return {
  262. "dispatch_job_id": job.id,
  263. "dispatch_position": dispatch_position,
  264. "status": "dispatched",
  265. "printer_id": printer_id,
  266. "source_id": source_id,
  267. "source_name": source_name,
  268. }
  269. async def _dispatcher_loop(self):
  270. while True:
  271. await self._job_event.wait()
  272. self._job_event.clear()
  273. while True:
  274. payload: dict[str, Any] | None = None
  275. job_to_start: PrintDispatchJob | None = None
  276. async with self._lock:
  277. busy_printer_ids = {state.job.printer_id for state in self._active_jobs.values()}
  278. start_index = next(
  279. (
  280. idx
  281. for idx, queued_job in enumerate(self._queued_jobs)
  282. if queued_job.printer_id not in busy_printer_ids
  283. ),
  284. None,
  285. )
  286. if start_index is None:
  287. break
  288. job_to_start = self._queued_jobs[start_index]
  289. del self._queued_jobs[start_index]
  290. self._active_jobs[job_to_start.id] = ActiveDispatchState(
  291. job=job_to_start,
  292. message="Preparing background dispatch...",
  293. )
  294. task = asyncio.create_task(
  295. self._run_active_job(job_to_start), name=f"background-dispatch-job-{job_to_start.id}"
  296. )
  297. self._running_tasks[job_to_start.id] = task
  298. payload = self._build_state_payload_unlocked(
  299. recent_event={
  300. "status": "processing",
  301. "job_id": job_to_start.id,
  302. "source_name": job_to_start.source_name,
  303. "printer_id": job_to_start.printer_id,
  304. "printer_name": job_to_start.printer_name,
  305. "message": "Preparing background dispatch...",
  306. }
  307. )
  308. if payload:
  309. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  310. async def _run_active_job(self, job: PrintDispatchJob):
  311. try:
  312. await self._process_job(job)
  313. await self._mark_job_finished(job, failed=False, message="Background dispatch complete")
  314. except DispatchJobCancelled:
  315. await self._mark_job_cancelled(job)
  316. except asyncio.CancelledError:
  317. raise
  318. except Exception as e:
  319. logger.error("Background dispatch job %s failed: %s", job.id, e, exc_info=True)
  320. await self._mark_job_finished(job, failed=True, message=str(e))
  321. finally:
  322. self._job_event.set()
  323. async def _set_active_message(self, job: PrintDispatchJob, message: str):
  324. async with self._lock:
  325. active = self._active_jobs.get(job.id)
  326. if not active:
  327. return
  328. active.message = message
  329. payload = self._build_state_payload_unlocked(
  330. recent_event={
  331. "status": "processing",
  332. "job_id": active.job.id,
  333. "source_name": active.job.source_name,
  334. "printer_id": active.job.printer_id,
  335. "printer_name": active.job.printer_name,
  336. "message": message,
  337. }
  338. )
  339. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  340. async def _set_active_upload_progress(self, job: PrintDispatchJob, uploaded: int, total: int):
  341. async with self._lock:
  342. active = self._active_jobs.get(job.id)
  343. if not active:
  344. return
  345. active.upload_bytes = max(0, int(uploaded))
  346. active.upload_total_bytes = max(0, int(total))
  347. payload = self._build_state_payload_unlocked(
  348. recent_event={
  349. "status": "processing",
  350. "job_id": active.job.id,
  351. "source_name": active.job.source_name,
  352. "printer_id": active.job.printer_id,
  353. "printer_name": active.job.printer_name,
  354. "message": active.message,
  355. }
  356. )
  357. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  358. async def _mark_job_finished(self, job: PrintDispatchJob, *, failed: bool, message: str):
  359. async with self._lock:
  360. if failed:
  361. self._batch_failed += 1
  362. else:
  363. self._batch_completed += 1
  364. self._active_jobs.pop(job.id, None)
  365. self._running_tasks.pop(job.id, None)
  366. self._cancel_requested_job_ids.discard(job.id)
  367. payload = self._build_state_payload_unlocked(
  368. recent_event={
  369. "status": "failed" if failed else "completed",
  370. "job_id": job.id,
  371. "source_name": job.source_name,
  372. "printer_id": job.printer_id,
  373. "printer_name": job.printer_name,
  374. "message": message,
  375. }
  376. )
  377. should_reset_batch = len(self._queued_jobs) == 0 and len(self._active_jobs) == 0
  378. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  379. if should_reset_batch:
  380. async with self._lock:
  381. self._batch_total = 0
  382. self._batch_completed = 0
  383. self._batch_failed = 0
  384. async def _mark_job_cancelled(self, job: PrintDispatchJob):
  385. async with self._lock:
  386. self._active_jobs.pop(job.id, None)
  387. self._running_tasks.pop(job.id, None)
  388. self._cancel_requested_job_ids.discard(job.id)
  389. self._batch_total = max(0, self._batch_total - 1)
  390. if self._batch_total == 0 and len(self._queued_jobs) == 0 and len(self._active_jobs) == 0:
  391. self._batch_completed = 0
  392. self._batch_failed = 0
  393. payload = self._build_state_payload_unlocked(
  394. recent_event={
  395. "status": "cancelled",
  396. "job_id": job.id,
  397. "source_name": job.source_name,
  398. "printer_id": job.printer_id,
  399. "printer_name": job.printer_name,
  400. "message": "Cancelled during dispatch",
  401. }
  402. )
  403. await ws_manager.broadcast({"type": "background_dispatch", "data": payload})
  404. def _is_cancel_requested(self, job_id: int) -> bool:
  405. return job_id in self._cancel_requested_job_ids
  406. def _raise_if_cancel_requested(self, job: PrintDispatchJob):
  407. if self._is_cancel_requested(job.id):
  408. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled")
  409. def _build_state_payload_unlocked(self, recent_event: dict[str, Any] | None = None) -> dict[str, Any]:
  410. processing = len(self._active_jobs)
  411. dispatched = len(self._queued_jobs)
  412. dispatched_jobs = [
  413. {
  414. "job_id": job.id,
  415. "kind": job.kind,
  416. "source_id": job.source_id,
  417. "source_name": job.source_name,
  418. "printer_id": job.printer_id,
  419. "printer_name": job.printer_name,
  420. }
  421. for job in list(self._queued_jobs)
  422. ]
  423. active_jobs: list[dict[str, Any]] = []
  424. for active in self._active_jobs.values():
  425. upload_progress_pct = None
  426. if active.upload_total_bytes and active.upload_total_bytes > 0 and active.upload_bytes is not None:
  427. upload_progress_pct = round(
  428. max(0.0, min(100.0, (active.upload_bytes / active.upload_total_bytes) * 100.0)), 1
  429. )
  430. active_jobs.append(
  431. {
  432. "job_id": active.job.id,
  433. "kind": active.job.kind,
  434. "source_id": active.job.source_id,
  435. "source_name": active.job.source_name,
  436. "printer_id": active.job.printer_id,
  437. "printer_name": active.job.printer_name,
  438. "message": active.message,
  439. "upload_bytes": active.upload_bytes,
  440. "upload_total_bytes": active.upload_total_bytes,
  441. "upload_progress_pct": upload_progress_pct,
  442. }
  443. )
  444. active_jobs.sort(key=lambda item: int(item["job_id"]))
  445. active_job = active_jobs[0] if active_jobs else None
  446. return {
  447. "total": self._batch_total,
  448. "dispatched": dispatched,
  449. "processing": processing,
  450. "completed": self._batch_completed,
  451. "failed": self._batch_failed,
  452. "dispatched_jobs": dispatched_jobs,
  453. "active_jobs": active_jobs,
  454. "active_job": active_job,
  455. "recent_event": recent_event,
  456. }
  457. async def _process_job(self, job: PrintDispatchJob):
  458. if job.kind == "reprint_archive":
  459. await self._run_reprint_archive(job)
  460. return
  461. if job.kind == "print_library_file":
  462. await self._run_print_library_file(job)
  463. return
  464. raise RuntimeError(f"Unknown dispatch job kind: {job.kind}")
  465. async def _run_reprint_archive(self, job: PrintDispatchJob):
  466. from backend.app.main import register_expected_print
  467. async with async_session() as db:
  468. service = ArchiveService(db)
  469. archive = await service.get_archive(job.source_id)
  470. if not archive:
  471. raise RuntimeError("Archive not found")
  472. printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id))
  473. if not printer:
  474. raise RuntimeError("Printer not found")
  475. printer_name = printer.name
  476. printer_ip = printer.ip_address
  477. printer_access_code = printer.access_code
  478. printer_model = printer.model
  479. archive_filename = archive.filename
  480. if not printer_manager.is_connected(job.printer_id):
  481. raise RuntimeError("Printer is not connected")
  482. file_path = settings.base_dir / archive.file_path
  483. if not file_path.exists():
  484. raise RuntimeError("Archive file not found")
  485. base_name = archive.filename
  486. if base_name.endswith(".gcode.3mf"):
  487. base_name = base_name[:-10]
  488. elif base_name.endswith(".3mf"):
  489. base_name = base_name[:-4]
  490. remote_filename = f"{base_name}.3mf"
  491. remote_path = f"/{remote_filename}"
  492. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  493. self._raise_if_cancel_requested(job)
  494. await self._set_active_message(job, f"Preparing upload to {printer_name}...")
  495. await delete_file_async(
  496. printer_ip,
  497. printer_access_code,
  498. remote_path,
  499. socket_timeout=ftp_timeout,
  500. printer_model=printer_model,
  501. )
  502. self._raise_if_cancel_requested(job)
  503. def upload_progress_callback(_uploaded: int, _total: int):
  504. if self._is_cancel_requested(job.id):
  505. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  506. try:
  507. await self._set_active_message(job, f"Uploading {archive_filename} to {printer_name}...")
  508. loop = asyncio.get_running_loop()
  509. progress_state = {"last_emit": 0.0, "last_bytes": 0}
  510. def upload_progress_callback(uploaded: int, total: int):
  511. if self._is_cancel_requested(job.id):
  512. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  513. now = time.monotonic()
  514. should_emit = (
  515. uploaded >= total
  516. or now - progress_state["last_emit"] >= 0.2
  517. or uploaded - progress_state["last_bytes"] >= 256 * 1024
  518. )
  519. if should_emit:
  520. progress_state["last_emit"] = now
  521. progress_state["last_bytes"] = uploaded
  522. loop.call_soon_threadsafe(
  523. lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t))
  524. )
  525. if ftp_retry_enabled:
  526. uploaded = await with_ftp_retry(
  527. upload_file_async,
  528. printer_ip,
  529. printer_access_code,
  530. file_path,
  531. remote_path,
  532. progress_callback=upload_progress_callback,
  533. socket_timeout=ftp_timeout,
  534. printer_model=printer_model,
  535. max_retries=ftp_retry_count,
  536. retry_delay=ftp_retry_delay,
  537. operation_name=f"Upload for reprint to {printer_name}",
  538. non_retry_exceptions=(DispatchJobCancelled,),
  539. )
  540. else:
  541. uploaded = await upload_file_async(
  542. printer_ip,
  543. printer_access_code,
  544. file_path,
  545. remote_path,
  546. progress_callback=upload_progress_callback,
  547. socket_timeout=ftp_timeout,
  548. printer_model=printer_model,
  549. )
  550. if uploaded:
  551. await self._set_active_upload_progress(job, 1, 1)
  552. if not uploaded:
  553. raise RuntimeError(
  554. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)."
  555. )
  556. register_expected_print(job.printer_id, remote_filename, job.source_id)
  557. plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id"))
  558. self._raise_if_cancel_requested(job)
  559. await self._set_active_message(job, f"Starting print on {printer_name}...")
  560. started = printer_manager.start_print(
  561. job.printer_id,
  562. remote_filename,
  563. plate_id,
  564. ams_mapping=job.options.get("ams_mapping"),
  565. timelapse=job.options.get("timelapse", False),
  566. bed_levelling=job.options.get("bed_levelling", True),
  567. flow_cali=job.options.get("flow_cali", False),
  568. vibration_cali=job.options.get("vibration_cali", False),
  569. layer_inspect=job.options.get("layer_inspect", False),
  570. use_ams=job.options.get("use_ams", True),
  571. )
  572. if not started:
  573. raise RuntimeError("Failed to start print")
  574. if job.requested_by_user_id and job.requested_by_username:
  575. printer_manager.set_current_print_user(
  576. job.printer_id,
  577. job.requested_by_user_id,
  578. job.requested_by_username,
  579. )
  580. except DispatchJobCancelled:
  581. await self._set_active_message(job, f"Cancelled upload on {printer_name}.")
  582. raise
  583. async def _run_print_library_file(self, job: PrintDispatchJob):
  584. from backend.app.main import register_expected_print
  585. async with async_session() as db:
  586. lib_file = await db.scalar(select(LibraryFile).where(LibraryFile.id == job.source_id))
  587. if not lib_file:
  588. raise RuntimeError("File not found")
  589. if not self._is_sliced_file(lib_file.filename):
  590. raise RuntimeError("Not a sliced file. Only .gcode or .gcode.3mf files can be printed.")
  591. file_path = Path(settings.base_dir) / lib_file.file_path
  592. if not file_path.exists():
  593. raise RuntimeError("File not found on disk")
  594. printer = await db.scalar(select(Printer).where(Printer.id == job.printer_id))
  595. if not printer:
  596. raise RuntimeError("Printer not found")
  597. printer_name = printer.name
  598. printer_ip = printer.ip_address
  599. printer_access_code = printer.access_code
  600. printer_model = printer.model
  601. library_filename = lib_file.filename
  602. if not printer_manager.is_connected(job.printer_id):
  603. raise RuntimeError("Printer is not connected")
  604. await self._set_active_message(job, f"Creating archive for {lib_file.filename}...")
  605. archive_service = ArchiveService(db)
  606. archive = await archive_service.archive_print(
  607. printer_id=job.printer_id,
  608. source_file=file_path,
  609. )
  610. if not archive:
  611. raise RuntimeError("Failed to create archive")
  612. await db.flush()
  613. base_name = lib_file.filename
  614. if base_name.endswith(".gcode.3mf"):
  615. base_name = base_name[:-10]
  616. elif base_name.endswith(".3mf"):
  617. base_name = base_name[:-4]
  618. remote_filename = f"{base_name}.3mf"
  619. remote_path = f"/{remote_filename}"
  620. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  621. self._raise_if_cancel_requested(job)
  622. await self._set_active_message(job, f"Preparing upload to {printer_name}...")
  623. await delete_file_async(
  624. printer_ip,
  625. printer_access_code,
  626. remote_path,
  627. socket_timeout=ftp_timeout,
  628. printer_model=printer_model,
  629. )
  630. self._raise_if_cancel_requested(job)
  631. def upload_progress_callback(_uploaded: int, _total: int):
  632. if self._is_cancel_requested(job.id):
  633. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  634. try:
  635. await self._set_active_message(job, f"Uploading {library_filename} to {printer_name}...")
  636. loop = asyncio.get_running_loop()
  637. progress_state = {"last_emit": 0.0, "last_bytes": 0}
  638. def upload_progress_callback(uploaded: int, total: int):
  639. if self._is_cancel_requested(job.id):
  640. raise DispatchJobCancelled(f"Dispatch job {job.id} cancelled during upload")
  641. now = time.monotonic()
  642. should_emit = (
  643. uploaded >= total
  644. or now - progress_state["last_emit"] >= 0.2
  645. or uploaded - progress_state["last_bytes"] >= 256 * 1024
  646. )
  647. if should_emit:
  648. progress_state["last_emit"] = now
  649. progress_state["last_bytes"] = uploaded
  650. loop.call_soon_threadsafe(
  651. lambda u=uploaded, t=total: asyncio.create_task(self._set_active_upload_progress(job, u, t))
  652. )
  653. if ftp_retry_enabled:
  654. uploaded = await with_ftp_retry(
  655. upload_file_async,
  656. printer_ip,
  657. printer_access_code,
  658. file_path,
  659. remote_path,
  660. progress_callback=upload_progress_callback,
  661. socket_timeout=ftp_timeout,
  662. printer_model=printer_model,
  663. max_retries=ftp_retry_count,
  664. retry_delay=ftp_retry_delay,
  665. operation_name=f"Upload for print to {printer_name}",
  666. non_retry_exceptions=(DispatchJobCancelled,),
  667. )
  668. else:
  669. uploaded = await upload_file_async(
  670. printer_ip,
  671. printer_access_code,
  672. file_path,
  673. remote_path,
  674. progress_callback=upload_progress_callback,
  675. socket_timeout=ftp_timeout,
  676. printer_model=printer_model,
  677. )
  678. if uploaded:
  679. await self._set_active_upload_progress(job, 1, 1)
  680. if not uploaded:
  681. await db.rollback()
  682. raise RuntimeError(
  683. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT)."
  684. )
  685. register_expected_print(job.printer_id, remote_filename, archive.id)
  686. plate_id = self._resolve_plate_id(file_path, job.options.get("plate_id"))
  687. self._raise_if_cancel_requested(job)
  688. await self._set_active_message(job, f"Starting print on {printer_name}...")
  689. started = printer_manager.start_print(
  690. job.printer_id,
  691. remote_filename,
  692. plate_id,
  693. ams_mapping=job.options.get("ams_mapping"),
  694. timelapse=job.options.get("timelapse", False),
  695. bed_levelling=job.options.get("bed_levelling", True),
  696. flow_cali=job.options.get("flow_cali", False),
  697. vibration_cali=job.options.get("vibration_cali", False),
  698. layer_inspect=job.options.get("layer_inspect", False),
  699. use_ams=job.options.get("use_ams", True),
  700. )
  701. if not started:
  702. await db.rollback()
  703. raise RuntimeError("Failed to start print")
  704. await db.commit()
  705. except DispatchJobCancelled:
  706. await db.rollback()
  707. await self._set_active_message(job, f"Cancelled upload on {printer_name}.")
  708. raise
  709. @staticmethod
  710. def _resolve_plate_id(file_path: Path, requested_plate_id: int | None) -> int:
  711. if requested_plate_id is not None:
  712. return requested_plate_id
  713. plate_id = 1
  714. try:
  715. with zipfile.ZipFile(file_path, "r") as zf:
  716. for name in zf.namelist():
  717. if name.startswith("Metadata/plate_") and name.endswith(".gcode"):
  718. plate_str = name[15:-6]
  719. plate_id = int(plate_str)
  720. break
  721. except (ValueError, zipfile.BadZipFile, OSError):
  722. pass
  723. return plate_id
  724. @staticmethod
  725. def _is_sliced_file(filename: str) -> bool:
  726. lower = filename.lower()
  727. return lower.endswith(".gcode") or lower.endswith(".gcode.3mf")
  728. background_dispatch = BackgroundDispatchService()