print_queue.py 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088
  1. """API routes for print queue management."""
  2. import json
  3. import logging
  4. import zipfile
  5. from datetime import datetime, timezone
  6. from pathlib import Path
  7. import defusedxml.ElementTree as ET
  8. from fastapi import APIRouter, Depends, HTTPException, Query
  9. from sqlalchemy import and_, func, or_, select
  10. from sqlalchemy.ext.asyncio import AsyncSession
  11. from sqlalchemy.orm import selectinload
  12. from backend.app.core.auth import RequirePermissionIfAuthEnabled, require_ownership_permission
  13. from backend.app.core.config import settings
  14. from backend.app.core.database import get_db
  15. from backend.app.core.permissions import Permission
  16. from backend.app.models.archive import PrintArchive
  17. from backend.app.models.library import LibraryFile
  18. from backend.app.models.print_batch import PrintBatch
  19. from backend.app.models.print_queue import PrintQueueItem
  20. from backend.app.models.printer import Printer
  21. from backend.app.models.project import Project
  22. from backend.app.models.user import User
  23. from backend.app.schemas.print_queue import (
  24. PrintBatchResponse,
  25. PrintQueueBulkUpdate,
  26. PrintQueueBulkUpdateResponse,
  27. PrintQueueItemCreate,
  28. PrintQueueItemResponse,
  29. PrintQueueItemUpdate,
  30. PrintQueueReorder,
  31. )
  32. from backend.app.services.filament_deficit import compute_deficit_for_queue_item
  33. from backend.app.services.notification_service import notification_service
  34. from backend.app.utils.printer_models import normalize_printer_model, normalize_printer_model_id
  35. from backend.app.utils.threemf_tools import extract_filament_usage_from_3mf
  36. logger = logging.getLogger(__name__)
  37. router = APIRouter(prefix="/queue", tags=["queue"])
  38. def _extract_filament_types_from_3mf(file_path: Path, plate_id: int | None = None) -> list[str]:
  39. """Extract unique filament types from a 3MF file.
  40. Args:
  41. file_path: Path to the 3MF file
  42. plate_id: Optional plate index to filter for (for multi-plate files)
  43. Returns:
  44. List of unique filament types (e.g., ["PLA", "PETG"])
  45. """
  46. types: set[str] = set()
  47. try:
  48. with zipfile.ZipFile(file_path, "r") as zf:
  49. if "Metadata/slice_info.config" not in zf.namelist():
  50. return []
  51. content = zf.read("Metadata/slice_info.config").decode()
  52. root = ET.fromstring(content)
  53. if plate_id is not None:
  54. # Find the plate element with matching index
  55. for plate_elem in root.findall(".//plate"):
  56. plate_index = None
  57. for meta in plate_elem.findall("metadata"):
  58. if meta.get("key") == "index":
  59. try:
  60. plate_index = int(meta.get("value", "0"))
  61. except ValueError:
  62. pass # Skip plate with unparseable index
  63. break
  64. if plate_index == plate_id:
  65. for filament_elem in plate_elem.findall("filament"):
  66. filament_type = filament_elem.get("type", "")
  67. used_g = filament_elem.get("used_g", "0")
  68. try:
  69. used_grams = float(used_g)
  70. except (ValueError, TypeError):
  71. used_grams = 0
  72. if used_grams > 0 and filament_type:
  73. types.add(filament_type)
  74. break
  75. else:
  76. # No plate_id specified - extract all filaments with used_g > 0
  77. for filament_elem in root.findall(".//filament"):
  78. filament_type = filament_elem.get("type", "")
  79. used_g = filament_elem.get("used_g", "0")
  80. try:
  81. used_grams = float(used_g)
  82. except (ValueError, TypeError):
  83. used_grams = 0
  84. if used_grams > 0 and filament_type:
  85. types.add(filament_type)
  86. except Exception as e:
  87. logger.warning("Failed to extract filament types from %s: %s", file_path, e)
  88. return sorted(types)
  89. def _extract_print_time_from_3mf(file_path: Path, plate_id: int | None = None) -> int | None:
  90. """Extract print time (prediction) from a 3MF file.
  91. Args:
  92. file_path: Path to the 3MF file
  93. plate_id: Optional plate index to filter for (for multi-plate files)
  94. Returns:
  95. Print time in seconds, or None if not found
  96. """
  97. try:
  98. with zipfile.ZipFile(file_path, "r") as zf:
  99. if "Metadata/slice_info.config" not in zf.namelist():
  100. return None
  101. content = zf.read("Metadata/slice_info.config").decode()
  102. root = ET.fromstring(content)
  103. if plate_id is not None:
  104. for plate_elem in root.findall(".//plate"):
  105. plate_index = None
  106. for meta in plate_elem.findall("metadata"):
  107. if meta.get("key") == "index":
  108. try:
  109. plate_index = int(meta.get("value", "0"))
  110. except ValueError:
  111. pass # Skip plate with unparseable index
  112. break
  113. if plate_index == plate_id:
  114. for meta in plate_elem.findall("metadata"):
  115. if meta.get("key") == "prediction":
  116. try:
  117. return int(meta.get("value", "0"))
  118. except ValueError:
  119. return None
  120. break
  121. else:
  122. plate_elem = root.find(".//plate")
  123. if plate_elem is not None:
  124. for meta in plate_elem.findall("metadata"):
  125. if meta.get("key") == "prediction":
  126. try:
  127. return int(meta.get("value", "0"))
  128. except ValueError:
  129. return None
  130. except Exception as e:
  131. logger.warning("Failed to extract print time from %s: %s", file_path, e)
  132. return None
  133. def _enrich_response(item: PrintQueueItem) -> PrintQueueItemResponse:
  134. """Add nested archive/printer/library_file info to response."""
  135. # Parse ams_mapping from JSON string BEFORE model_validate
  136. ams_mapping_parsed = None
  137. if item.ams_mapping:
  138. try:
  139. ams_mapping_parsed = json.loads(item.ams_mapping)
  140. except json.JSONDecodeError:
  141. ams_mapping_parsed = None
  142. # Parse required_filament_types from JSON string
  143. required_filament_types_parsed = None
  144. if item.required_filament_types:
  145. try:
  146. required_filament_types_parsed = json.loads(item.required_filament_types)
  147. except json.JSONDecodeError:
  148. required_filament_types_parsed = None
  149. # Parse filament_overrides from JSON string
  150. filament_overrides_parsed = None
  151. if item.filament_overrides:
  152. try:
  153. filament_overrides_parsed = json.loads(item.filament_overrides)
  154. except json.JSONDecodeError:
  155. filament_overrides_parsed = None
  156. # Create response with parsed ams_mapping
  157. item_dict = {
  158. "id": item.id,
  159. "printer_id": item.printer_id,
  160. "target_model": item.target_model,
  161. "target_location": item.target_location,
  162. "required_filament_types": required_filament_types_parsed,
  163. "filament_overrides": filament_overrides_parsed,
  164. "waiting_reason": item.waiting_reason,
  165. "archive_id": item.archive_id,
  166. "library_file_id": item.library_file_id,
  167. "position": item.position,
  168. "scheduled_time": item.scheduled_time,
  169. "require_previous_success": item.require_previous_success,
  170. "auto_off_after": item.auto_off_after,
  171. "manual_start": item.manual_start,
  172. "filament_short": bool(item.filament_short),
  173. "ams_mapping": ams_mapping_parsed,
  174. "plate_id": item.plate_id,
  175. "bed_levelling": item.bed_levelling,
  176. "flow_cali": item.flow_cali,
  177. "vibration_cali": item.vibration_cali,
  178. "layer_inspect": item.layer_inspect,
  179. "timelapse": item.timelapse,
  180. "use_ams": item.use_ams,
  181. "status": item.status,
  182. "started_at": item.started_at,
  183. "completed_at": item.completed_at,
  184. "error_message": item.error_message,
  185. "created_at": item.created_at,
  186. # User tracking (Issue #206)
  187. "created_by_id": item.created_by_id,
  188. "created_by_username": item.created_by.username if item.created_by else None,
  189. # Batch grouping
  190. "batch_id": item.batch_id,
  191. "batch_name": item.batch.name if item.batch else None,
  192. # SJF scheduling
  193. "been_jumped": item.been_jumped,
  194. # Auto-print G-code injection
  195. "gcode_injection": item.gcode_injection,
  196. }
  197. response = PrintQueueItemResponse(**item_dict)
  198. if item.archive:
  199. # Soft-deleted archive: files are gone from disk but the row stays
  200. # (its filament/cost contribution still flows into stats per #1343).
  201. # Suppress the archive-derived UI surface so the queue page doesn't
  202. # 404-storm the thumbnail / plates / plate-thumbnail endpoints — the
  203. # frontend's existing truthy gate on archive_thumbnail covers it
  204. # (#1348 follow-up). The archive_deleted flag lets the UI render a
  205. # "source deleted" badge on these rows.
  206. if item.archive.deleted_at is not None:
  207. response.archive_deleted = True
  208. else:
  209. response.archive_name = item.archive.print_name or item.archive.filename
  210. response.archive_thumbnail = item.archive.thumbnail_path
  211. response.print_time_seconds = item.archive.print_time_seconds
  212. response.filament_used_grams = item.archive.filament_used_grams
  213. response.filament_type = item.archive.filament_type
  214. response.filament_color = item.archive.filament_color
  215. response.layer_height = item.archive.layer_height
  216. response.nozzle_diameter = item.archive.nozzle_diameter
  217. response.sliced_for_model = item.archive.sliced_for_model
  218. if item.plate_id:
  219. archive_path = settings.base_dir / item.archive.file_path
  220. if archive_path.exists():
  221. plate_time = _extract_print_time_from_3mf(archive_path, item.plate_id)
  222. plate_weight = sum(
  223. f["used_g"] for f in extract_filament_usage_from_3mf(archive_path, item.plate_id)
  224. )
  225. if plate_time is not None:
  226. response.print_time_seconds = plate_time
  227. if plate_weight > 0:
  228. response.filament_used_grams = plate_weight
  229. if item.library_file:
  230. response.library_file_name = (
  231. item.library_file.file_metadata.get("print_name") if item.library_file.file_metadata else None
  232. )
  233. if not response.library_file_name:
  234. response.library_file_name = item.library_file.filename
  235. response.library_file_thumbnail = item.library_file.thumbnail_path
  236. # Get metadata from library file if no archive
  237. if not item.archive and item.library_file.file_metadata:
  238. response.print_time_seconds = item.library_file.file_metadata.get("print_time_seconds")
  239. response.filament_used_grams = item.library_file.file_metadata.get("filament_used_grams")
  240. response.filament_type = item.library_file.file_metadata.get("filament_type")
  241. response.filament_color = item.library_file.file_metadata.get("filament_color")
  242. response.layer_height = item.library_file.file_metadata.get("layer_height")
  243. response.nozzle_diameter = item.library_file.file_metadata.get("nozzle_diameter")
  244. response.sliced_for_model = item.library_file.file_metadata.get("sliced_for_model")
  245. if item.plate_id:
  246. lib_path = Path(item.library_file.file_path)
  247. library_file_path = lib_path if lib_path.is_absolute() else settings.base_dir / item.library_file.file_path
  248. if library_file_path.exists():
  249. plate_time = _extract_print_time_from_3mf(library_file_path, item.plate_id)
  250. plate_weight = sum(
  251. f["used_g"] for f in extract_filament_usage_from_3mf(library_file_path, item.plate_id)
  252. )
  253. if plate_time is not None:
  254. response.print_time_seconds = plate_time
  255. if plate_weight > 0:
  256. response.filament_used_grams = plate_weight
  257. if item.printer:
  258. response.printer_name = item.printer.name
  259. return response
  260. @router.get("/", response_model=list[PrintQueueItemResponse])
  261. async def list_queue(
  262. printer_id: int | None = Query(None, description="Filter by printer (-1 for unassigned)"),
  263. status: str | None = Query(None, description="Filter by status"),
  264. target_model: str | None = Query(
  265. None, description="Filter by target model (also includes model-based items when combined with printer_id)"
  266. ),
  267. db: AsyncSession = Depends(get_db),
  268. _: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_READ),
  269. ):
  270. """List all queue items, optionally filtered by printer or status."""
  271. query = (
  272. select(PrintQueueItem)
  273. .options(
  274. selectinload(PrintQueueItem.archive),
  275. selectinload(PrintQueueItem.printer),
  276. selectinload(PrintQueueItem.library_file),
  277. selectinload(PrintQueueItem.created_by),
  278. selectinload(PrintQueueItem.batch),
  279. )
  280. .order_by(PrintQueueItem.printer_id.nulls_first(), PrintQueueItem.position)
  281. )
  282. if printer_id is not None:
  283. if printer_id == -1:
  284. # Special value: filter for unassigned items
  285. query = query.where(PrintQueueItem.printer_id.is_(None))
  286. else:
  287. # Resolve effective model: prefer explicit param, fall back to printer's DB model.
  288. # This ensures model-based "Any X" items are returned even when the frontend
  289. # doesn't send target_model (e.g. printer.model is NULL on the client side).
  290. effective_model = target_model
  291. if not effective_model:
  292. printer_row = (
  293. await db.execute(select(Printer.model).where(Printer.id == printer_id))
  294. ).scalar_one_or_none()
  295. effective_model = printer_row
  296. if effective_model:
  297. # Include both printer-specific items AND model-based (unassigned) items
  298. query = query.where(
  299. or_(
  300. PrintQueueItem.printer_id == printer_id,
  301. and_(
  302. PrintQueueItem.printer_id.is_(None),
  303. func.lower(PrintQueueItem.target_model) == effective_model.lower(),
  304. ),
  305. )
  306. )
  307. else:
  308. query = query.where(PrintQueueItem.printer_id == printer_id)
  309. elif target_model:
  310. query = query.where(func.lower(PrintQueueItem.target_model) == target_model.lower())
  311. if status:
  312. query = query.where(PrintQueueItem.status == status)
  313. result = await db.execute(query)
  314. items = result.scalars().all()
  315. return [_enrich_response(item) for item in items]
  316. @router.post("/", response_model=PrintQueueItemResponse)
  317. async def add_to_queue(
  318. data: PrintQueueItemCreate,
  319. db: AsyncSession = Depends(get_db),
  320. current_user: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_CREATE),
  321. ):
  322. """Add an item to the print queue."""
  323. # Normalize target_model (e.g., "Bambu Lab X1E" / "C13" -> "X1E")
  324. target_model_norm = None
  325. if data.target_model:
  326. target_model_norm = (
  327. normalize_printer_model(data.target_model)
  328. or normalize_printer_model_id(data.target_model)
  329. or data.target_model
  330. )
  331. # Validate that either archive_id or library_file_id is provided
  332. if not data.archive_id and not data.library_file_id:
  333. raise HTTPException(400, "Either archive_id or library_file_id must be provided")
  334. # Cannot specify both printer_id and target_model
  335. if data.printer_id and target_model_norm:
  336. raise HTTPException(400, "Cannot specify both printer_id and target_model")
  337. # Validate printer exists (if assigned)
  338. if data.printer_id is not None:
  339. result = await db.execute(select(Printer).where(Printer.id == data.printer_id))
  340. if not result.scalar_one_or_none():
  341. raise HTTPException(400, "Printer not found")
  342. # Validate target_model has active printers
  343. if target_model_norm:
  344. result = await db.execute(
  345. select(Printer).where(Printer.model == target_model_norm).where(Printer.is_active == True) # noqa: E712
  346. )
  347. if not result.scalars().first():
  348. raise HTTPException(400, f"No active printers for model: {target_model_norm}")
  349. # Validate archive exists (if provided) and get it for filament extraction
  350. archive = None
  351. if data.archive_id:
  352. result = await db.execute(select(PrintArchive).where(PrintArchive.id == data.archive_id))
  353. archive = result.scalar_one_or_none()
  354. if not archive:
  355. raise HTTPException(400, "Archive not found")
  356. # Validate library file exists (if provided) and get it for filament extraction
  357. library_file = None
  358. if data.library_file_id:
  359. result = await db.execute(LibraryFile.active().where(LibraryFile.id == data.library_file_id))
  360. library_file = result.scalar_one_or_none()
  361. if not library_file:
  362. raise HTTPException(400, "Library file not found")
  363. # Extract filament types for model-based assignment (used by scheduler for validation)
  364. required_filament_types = None
  365. if target_model_norm:
  366. # Get file path from archive or library file
  367. file_path = None
  368. if archive:
  369. file_path = settings.base_dir / archive.file_path
  370. elif library_file:
  371. lib_path = Path(library_file.file_path)
  372. file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
  373. if file_path and file_path.exists():
  374. filament_types = _extract_filament_types_from_3mf(file_path, data.plate_id)
  375. if filament_types:
  376. required_filament_types = json.dumps(filament_types)
  377. logger.info("Extracted filament types for model-based queue: %s", filament_types)
  378. # If filament overrides are provided, update required_filament_types to match override types
  379. filament_overrides_json = None
  380. if data.filament_overrides and target_model_norm:
  381. filament_overrides_json = json.dumps(data.filament_overrides)
  382. # Update required_filament_types from overrides so scheduler validates against overridden types
  383. override_types = sorted({o["type"] for o in data.filament_overrides if "type" in o})
  384. if override_types:
  385. # Merge with existing types (overrides may only cover some slots)
  386. existing_types = set(json.loads(required_filament_types)) if required_filament_types else set()
  387. # Replace types for overridden slots, keep others
  388. all_types = existing_types | set(override_types)
  389. required_filament_types = json.dumps(sorted(all_types))
  390. # Validate quantity
  391. quantity = max(1, data.quantity)
  392. # Create batch if quantity > 1
  393. batch = None
  394. batch_id = None
  395. if quantity > 1:
  396. # Derive batch name from source file
  397. batch_name_base = "Batch"
  398. if archive:
  399. batch_name_base = archive.print_name or archive.filename or "Batch"
  400. elif library_file:
  401. if library_file.file_metadata:
  402. batch_name_base = library_file.file_metadata.get("print_name") or library_file.filename
  403. else:
  404. batch_name_base = library_file.filename
  405. batch_name_base = batch_name_base.replace(".gcode.3mf", "").replace(".3mf", "")
  406. batch = PrintBatch(
  407. name=f"{batch_name_base} ×{quantity}",
  408. archive_id=data.archive_id,
  409. library_file_id=data.library_file_id,
  410. quantity=quantity,
  411. status="active",
  412. created_by_id=current_user.id if current_user else None,
  413. )
  414. db.add(batch)
  415. await db.flush() # Get batch.id before creating items
  416. batch_id = batch.id
  417. # Get next position for this printer (or for unassigned/model-based items)
  418. if data.printer_id is not None:
  419. result = await db.execute(
  420. select(func.max(PrintQueueItem.position))
  421. .where(PrintQueueItem.printer_id == data.printer_id)
  422. .where(PrintQueueItem.status == "pending")
  423. )
  424. else:
  425. # For unassigned/model-based items, get max position across all unassigned
  426. result = await db.execute(
  427. select(func.max(PrintQueueItem.position))
  428. .where(PrintQueueItem.printer_id.is_(None))
  429. .where(PrintQueueItem.status == "pending")
  430. )
  431. max_pos = result.scalar() or 0
  432. # Resolve print_time_seconds for SJF scheduling (cache on item at creation)
  433. cached_print_time = None
  434. if archive:
  435. cached_print_time = archive.print_time_seconds
  436. if data.plate_id:
  437. archive_path = settings.base_dir / archive.file_path
  438. if archive_path.exists():
  439. plate_time = _extract_print_time_from_3mf(archive_path, data.plate_id)
  440. if plate_time is not None:
  441. cached_print_time = plate_time
  442. elif library_file:
  443. if library_file.file_metadata:
  444. cached_print_time = library_file.file_metadata.get("print_time_seconds")
  445. if data.plate_id:
  446. lib_path = Path(library_file.file_path)
  447. library_file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
  448. if library_file_path.exists():
  449. plate_time = _extract_print_time_from_3mf(library_file_path, data.plate_id)
  450. if plate_time is not None:
  451. cached_print_time = plate_time
  452. # Validate project exists before insert so a bogus ID yields 404, not an FK-constraint 500
  453. if data.project_id is not None:
  454. project_result = await db.execute(select(Project).where(Project.id == data.project_id))
  455. if not project_result.scalar_one_or_none():
  456. raise HTTPException(status_code=404, detail="Project not found")
  457. ams_mapping_json = json.dumps(data.ams_mapping) if data.ams_mapping else None
  458. items = []
  459. for i in range(quantity):
  460. item = PrintQueueItem(
  461. printer_id=data.printer_id,
  462. target_model=target_model_norm,
  463. target_location=data.target_location,
  464. required_filament_types=required_filament_types,
  465. filament_overrides=filament_overrides_json,
  466. archive_id=data.archive_id,
  467. library_file_id=data.library_file_id,
  468. scheduled_time=data.scheduled_time,
  469. require_previous_success=data.require_previous_success,
  470. auto_off_after=data.auto_off_after,
  471. manual_start=data.manual_start,
  472. ams_mapping=ams_mapping_json,
  473. plate_id=data.plate_id,
  474. bed_levelling=data.bed_levelling,
  475. flow_cali=data.flow_cali,
  476. vibration_cali=data.vibration_cali,
  477. layer_inspect=data.layer_inspect,
  478. timelapse=data.timelapse,
  479. use_ams=data.use_ams,
  480. gcode_injection=data.gcode_injection,
  481. project_id=data.project_id,
  482. position=max_pos + 1 + i,
  483. status="pending",
  484. created_by_id=current_user.id if current_user else None,
  485. batch_id=batch_id,
  486. print_time_seconds=cached_print_time,
  487. )
  488. db.add(item)
  489. items.append(item)
  490. await db.commit()
  491. # Refresh the first item for the response
  492. item = items[0]
  493. await db.refresh(item)
  494. await db.refresh(item, ["archive", "printer", "library_file", "created_by", "batch"])
  495. source_name = f"archive {data.archive_id}" if data.archive_id else f"library file {data.library_file_id}"
  496. target_desc = data.printer_id or (f"model {target_model_norm}" if target_model_norm else "unassigned")
  497. qty_desc = f" (×{quantity})" if quantity > 1 else ""
  498. logger.info("Added %s to queue for %s%s", source_name, target_desc, qty_desc)
  499. # MQTT relay - publish queue job added
  500. try:
  501. from backend.app.services.mqtt_relay import mqtt_relay
  502. await mqtt_relay.on_queue_job_added(
  503. job_id=item.id,
  504. filename=item.archive.filename if item.archive else "",
  505. printer_id=item.printer_id,
  506. printer_name=item.printer.name if item.printer else None,
  507. )
  508. except Exception:
  509. pass # Don't fail queue add if MQTT fails
  510. # Send notification for job added
  511. try:
  512. job_name = (
  513. item.archive.filename
  514. if item.archive
  515. else item.library_file.filename
  516. if item.library_file
  517. else f"Job #{item.id}"
  518. )
  519. job_name = job_name.replace(".gcode.3mf", "").replace(".3mf", "")
  520. if quantity > 1:
  521. job_name = f"{job_name} ×{quantity}"
  522. target = (
  523. item.printer.name if item.printer else (f"Any {item.target_model}" if target_model_norm else "Unassigned")
  524. )
  525. await notification_service.on_queue_job_added(
  526. job_name=job_name,
  527. target=target,
  528. db=db,
  529. printer_id=item.printer_id,
  530. printer_name=item.printer.name if item.printer else None,
  531. )
  532. except Exception:
  533. pass # Don't fail queue add if notification fails
  534. return _enrich_response(item)
  535. @router.patch("/bulk", response_model=PrintQueueBulkUpdateResponse)
  536. async def bulk_update_queue_items(
  537. data: PrintQueueBulkUpdate,
  538. db: AsyncSession = Depends(get_db),
  539. auth_result: tuple[User | None, bool] = Depends(
  540. require_ownership_permission(
  541. Permission.QUEUE_UPDATE_ALL,
  542. Permission.QUEUE_UPDATE_OWN,
  543. )
  544. ),
  545. ):
  546. """Bulk update multiple queue items with the same values.
  547. Only pending items can be updated. Non-pending items are skipped.
  548. Items not owned by the user are also skipped (unless user has *_all permission).
  549. """
  550. user, can_modify_all = auth_result
  551. if not data.item_ids:
  552. raise HTTPException(400, "No item IDs provided")
  553. # Get fields to update (exclude item_ids and unset fields)
  554. update_data = data.model_dump(exclude={"item_ids"}, exclude_unset=True)
  555. if not update_data:
  556. raise HTTPException(400, "No fields to update")
  557. # Validate printer_id if being changed
  558. if "printer_id" in update_data and update_data["printer_id"] is not None:
  559. result = await db.execute(select(Printer).where(Printer.id == update_data["printer_id"]))
  560. if not result.scalar_one_or_none():
  561. raise HTTPException(400, "Printer not found")
  562. # Fetch all items
  563. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id.in_(data.item_ids)))
  564. items = result.scalars().all()
  565. updated_count = 0
  566. skipped_count = 0
  567. for item in items:
  568. if item.status != "pending":
  569. skipped_count += 1
  570. continue
  571. # Ownership check
  572. if not can_modify_all and item.created_by_id != user.id:
  573. skipped_count += 1
  574. continue
  575. for field, value in update_data.items():
  576. setattr(item, field, value)
  577. updated_count += 1
  578. await db.commit()
  579. logger.info("Bulk updated %s queue items, skipped %s", updated_count, skipped_count)
  580. return PrintQueueBulkUpdateResponse(
  581. updated_count=updated_count,
  582. skipped_count=skipped_count,
  583. message=f"Updated {updated_count} items"
  584. + (f", skipped {skipped_count} non-pending/not-owned" if skipped_count else ""),
  585. )
  586. # --- Batch endpoints ---
  587. @router.get("/batches", response_model=list[PrintBatchResponse])
  588. async def list_batches(
  589. status: str | None = Query(None, description="Filter by status (active, completed, cancelled)"),
  590. db: AsyncSession = Depends(get_db),
  591. current_user: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_READ),
  592. ):
  593. """List all print batches with progress stats."""
  594. query = select(PrintBatch).order_by(PrintBatch.created_at.desc())
  595. if status:
  596. query = query.where(PrintBatch.status == status)
  597. result = await db.execute(query)
  598. batches = result.scalars().all()
  599. responses = []
  600. for batch in batches:
  601. responses.append(await _build_batch_response(db, batch))
  602. return responses
  603. @router.get("/batches/{batch_id}", response_model=PrintBatchResponse)
  604. async def get_batch(
  605. batch_id: int,
  606. db: AsyncSession = Depends(get_db),
  607. current_user: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_READ),
  608. ):
  609. """Get a print batch with progress stats."""
  610. result = await db.execute(select(PrintBatch).where(PrintBatch.id == batch_id))
  611. batch = result.scalar_one_or_none()
  612. if not batch:
  613. raise HTTPException(404, "Batch not found")
  614. return await _build_batch_response(db, batch)
  615. @router.delete("/batches/{batch_id}")
  616. async def cancel_batch(
  617. batch_id: int,
  618. db: AsyncSession = Depends(get_db),
  619. current_user: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_DELETE_ALL),
  620. ):
  621. """Cancel all pending items in a batch and mark batch as cancelled."""
  622. result = await db.execute(select(PrintBatch).where(PrintBatch.id == batch_id))
  623. batch = result.scalar_one_or_none()
  624. if not batch:
  625. raise HTTPException(404, "Batch not found")
  626. # Cancel all pending queue items in this batch
  627. result = await db.execute(
  628. select(PrintQueueItem).where(and_(PrintQueueItem.batch_id == batch_id, PrintQueueItem.status == "pending"))
  629. )
  630. pending_items = result.scalars().all()
  631. cancelled_count = 0
  632. for item in pending_items:
  633. item.status = "cancelled"
  634. cancelled_count += 1
  635. batch.status = "cancelled"
  636. await db.commit()
  637. return {"message": f"Batch cancelled, {cancelled_count} pending items cancelled"}
  638. async def _build_batch_response(db: AsyncSession, batch: PrintBatch) -> PrintBatchResponse:
  639. """Build a batch response with derived counts from queue items."""
  640. # Count queue items by status
  641. result = await db.execute(
  642. select(PrintQueueItem.status, func.count(PrintQueueItem.id))
  643. .where(PrintQueueItem.batch_id == batch.id)
  644. .group_by(PrintQueueItem.status)
  645. )
  646. status_counts = {row[0]: row[1] for row in result.fetchall()}
  647. # Load created_by for username
  648. created_by_username = None
  649. if batch.created_by_id:
  650. result = await db.execute(select(User).where(User.id == batch.created_by_id))
  651. user = result.scalar_one_or_none()
  652. if user:
  653. created_by_username = user.username
  654. return PrintBatchResponse(
  655. id=batch.id,
  656. name=batch.name,
  657. archive_id=batch.archive_id,
  658. library_file_id=batch.library_file_id,
  659. quantity=batch.quantity,
  660. status=batch.status,
  661. created_at=batch.created_at,
  662. created_by_id=batch.created_by_id,
  663. created_by_username=created_by_username,
  664. pending_count=status_counts.get("pending", 0),
  665. printing_count=status_counts.get("printing", 0),
  666. completed_count=status_counts.get("completed", 0),
  667. failed_count=status_counts.get("failed", 0),
  668. cancelled_count=status_counts.get("cancelled", 0),
  669. )
  670. @router.get("/{item_id}", response_model=PrintQueueItemResponse)
  671. async def get_queue_item(
  672. item_id: int,
  673. db: AsyncSession = Depends(get_db),
  674. _: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_READ),
  675. ):
  676. """Get a specific queue item."""
  677. result = await db.execute(
  678. select(PrintQueueItem)
  679. .options(
  680. selectinload(PrintQueueItem.archive),
  681. selectinload(PrintQueueItem.printer),
  682. selectinload(PrintQueueItem.library_file),
  683. selectinload(PrintQueueItem.created_by),
  684. selectinload(PrintQueueItem.batch),
  685. )
  686. .where(PrintQueueItem.id == item_id)
  687. )
  688. item = result.scalar_one_or_none()
  689. if not item:
  690. raise HTTPException(404, "Queue item not found")
  691. return _enrich_response(item)
  692. @router.patch("/{item_id}", response_model=PrintQueueItemResponse)
  693. async def update_queue_item(
  694. item_id: int,
  695. data: PrintQueueItemUpdate,
  696. db: AsyncSession = Depends(get_db),
  697. auth_result: tuple[User | None, bool] = Depends(
  698. require_ownership_permission(
  699. Permission.QUEUE_UPDATE_ALL,
  700. Permission.QUEUE_UPDATE_OWN,
  701. )
  702. ),
  703. ):
  704. """Update a queue item."""
  705. user, can_modify_all = auth_result
  706. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == item_id))
  707. item = result.scalar_one_or_none()
  708. if not item:
  709. raise HTTPException(404, "Queue item not found")
  710. # Ownership check
  711. if not can_modify_all:
  712. if item.created_by_id != user.id:
  713. raise HTTPException(403, "You can only update your own queue items")
  714. if item.status != "pending":
  715. raise HTTPException(400, "Can only update pending items")
  716. update_data = data.model_dump(exclude_unset=True)
  717. # Normalize target_model if being updated
  718. if "target_model" in update_data and update_data["target_model"]:
  719. update_data["target_model"] = (
  720. normalize_printer_model(update_data["target_model"])
  721. or normalize_printer_model_id(update_data["target_model"])
  722. or update_data["target_model"]
  723. )
  724. # Cannot specify both printer_id and target_model
  725. new_printer_id = update_data.get("printer_id", item.printer_id)
  726. new_target_model = update_data.get("target_model", item.target_model)
  727. if new_printer_id and new_target_model:
  728. raise HTTPException(400, "Cannot specify both printer_id and target_model")
  729. # Validate new printer_id if being changed (and not None)
  730. if "printer_id" in update_data and update_data["printer_id"] is not None:
  731. result = await db.execute(select(Printer).where(Printer.id == update_data["printer_id"]))
  732. if not result.scalar_one_or_none():
  733. raise HTTPException(400, "Printer not found")
  734. # Validate target_model has active printers
  735. if "target_model" in update_data and update_data["target_model"]:
  736. result = await db.execute(
  737. select(Printer).where(Printer.model == update_data["target_model"]).where(Printer.is_active == True) # noqa: E712
  738. )
  739. if not result.scalars().first():
  740. raise HTTPException(400, f"No active printers for model: {update_data['target_model']}")
  741. # Serialize ams_mapping to JSON for TEXT column storage
  742. if "ams_mapping" in update_data:
  743. update_data["ams_mapping"] = json.dumps(update_data["ams_mapping"]) if update_data["ams_mapping"] else None
  744. # Serialize filament_overrides to JSON for TEXT column storage
  745. if "filament_overrides" in update_data:
  746. update_data["filament_overrides"] = (
  747. json.dumps(update_data["filament_overrides"]) if update_data["filament_overrides"] else None
  748. )
  749. for field, value in update_data.items():
  750. setattr(item, field, value)
  751. await db.commit()
  752. await db.refresh(item, ["archive", "printer", "library_file", "created_by", "batch"])
  753. logger.info("Updated queue item %s", item_id)
  754. return _enrich_response(item)
  755. @router.delete("/{item_id}")
  756. async def delete_queue_item(
  757. item_id: int,
  758. db: AsyncSession = Depends(get_db),
  759. auth_result: tuple[User | None, bool] = Depends(
  760. require_ownership_permission(
  761. Permission.QUEUE_DELETE_ALL,
  762. Permission.QUEUE_DELETE_OWN,
  763. )
  764. ),
  765. ):
  766. """Remove an item from the queue."""
  767. user, can_modify_all = auth_result
  768. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == item_id))
  769. item = result.scalar_one_or_none()
  770. if not item:
  771. raise HTTPException(404, "Queue item not found")
  772. # Ownership check
  773. if not can_modify_all:
  774. if item.created_by_id != user.id:
  775. raise HTTPException(403, "You can only delete your own queue items")
  776. if item.status == "printing":
  777. raise HTTPException(400, "Cannot delete item that is currently printing")
  778. await db.delete(item)
  779. await db.commit()
  780. logger.info("Deleted queue item %s", item_id)
  781. return {"message": "Queue item deleted"}
  782. @router.post("/reorder")
  783. async def reorder_queue(
  784. data: PrintQueueReorder,
  785. db: AsyncSession = Depends(get_db),
  786. _: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_UPDATE_ALL),
  787. ):
  788. """Bulk update positions for queue items."""
  789. for reorder_item in data.items:
  790. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == reorder_item.id))
  791. item = result.scalar_one_or_none()
  792. if item and item.status == "pending":
  793. item.position = reorder_item.position
  794. await db.commit()
  795. logger.info("Reordered %s queue items", len(data.items))
  796. return {"message": f"Reordered {len(data.items)} items"}
  797. @router.post("/{item_id}/cancel")
  798. async def cancel_queue_item(
  799. item_id: int,
  800. db: AsyncSession = Depends(get_db),
  801. auth_result: tuple[User | None, bool] = Depends(
  802. require_ownership_permission(
  803. Permission.QUEUE_UPDATE_ALL,
  804. Permission.QUEUE_UPDATE_OWN,
  805. )
  806. ),
  807. ):
  808. """Cancel a pending queue item."""
  809. user, can_modify_all = auth_result
  810. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == item_id))
  811. item = result.scalar_one_or_none()
  812. if not item:
  813. raise HTTPException(404, "Queue item not found")
  814. # Ownership check
  815. if not can_modify_all:
  816. if item.created_by_id != user.id:
  817. raise HTTPException(403, "You can only cancel your own queue items")
  818. if item.status not in ("pending",):
  819. raise HTTPException(400, f"Cannot cancel item with status '{item.status}'")
  820. item.status = "cancelled"
  821. item.completed_at = datetime.now(timezone.utc)
  822. await db.commit()
  823. logger.info("Cancelled queue item %s", item_id)
  824. return {"message": "Queue item cancelled"}
  825. @router.post("/{item_id}/stop")
  826. async def stop_queue_item(
  827. item_id: int,
  828. db: AsyncSession = Depends(get_db),
  829. _: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_UPDATE_ALL),
  830. ):
  831. """Stop an actively printing queue item."""
  832. import asyncio
  833. from backend.app.models.smart_plug import SmartPlug
  834. from backend.app.services.printer_manager import printer_manager
  835. from backend.app.services.tasmota import tasmota_service
  836. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == item_id))
  837. item = result.scalar_one_or_none()
  838. if not item:
  839. raise HTTPException(404, "Queue item not found")
  840. if item.status != "printing":
  841. raise HTTPException(400, f"Can only stop items that are printing, current status: '{item.status}'")
  842. # Capture values we need for background task
  843. printer_id = item.printer_id
  844. auto_off_after = item.auto_off_after
  845. # Try to send stop command to printer
  846. stop_sent = False
  847. try:
  848. stop_sent = printer_manager.stop_print(printer_id)
  849. if not stop_sent:
  850. logger.warning("stop_print returned False for printer %s - printer may not be connected", printer_id)
  851. except Exception as e:
  852. logger.error("Error sending stop command for queue item %s: %s", item_id, e)
  853. # Mark this printer as user-stopped BEFORE the first await so that if the
  854. # MQTT on_print_complete callback fires during the db.commit() yield the flag
  855. # is already set and the "failed" status will be correctly overridden to
  856. # "cancelled" (preventing a spurious "print failed" notification).
  857. try:
  858. from backend.app.main import mark_printer_stopped_by_user
  859. mark_printer_stopped_by_user(printer_id)
  860. except Exception as _mark_err:
  861. logger.warning("Failed to mark printer %s as user-stopped: %s", printer_id, _mark_err)
  862. # Update queue item status regardless - if printer is off, print is already stopped
  863. item.status = "cancelled"
  864. item.completed_at = datetime.now(timezone.utc)
  865. item.error_message = "Stopped by user" if stop_sent else "Stopped by user (printer was offline)"
  866. await db.commit()
  867. # Get smart plug info if auto-off is enabled
  868. plug_ip = None
  869. if auto_off_after:
  870. result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
  871. plug = result.scalar_one_or_none()
  872. if plug and plug.enabled:
  873. plug_ip = plug.ip_address
  874. logger.info("Stopped printing queue item %s (stop command sent: %s)", item_id, stop_sent)
  875. # Schedule background task for cooldown + power off
  876. if plug_ip:
  877. async def cooldown_and_poweroff():
  878. logger.info("Auto-off: Waiting for printer %s to cool down before power off...", printer_id)
  879. await printer_manager.wait_for_cooldown(printer_id, target_temp=50.0, timeout=600)
  880. # Re-fetch plug since we're in a new async context
  881. from backend.app.core.database import async_session
  882. async with async_session() as new_db:
  883. result = await new_db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
  884. plug = result.scalar_one_or_none()
  885. if plug and plug.enabled:
  886. logger.info("Auto-off: Powering off printer %s", printer_id)
  887. await tasmota_service.turn_off(plug)
  888. asyncio.create_task(cooldown_and_poweroff())
  889. return {"message": "Print stopped" if stop_sent else "Queue item cancelled (printer was offline)"}
  890. @router.post("/{item_id}/start")
  891. async def start_queue_item(
  892. item_id: int,
  893. skip_filament_check: bool = Query(default=False),
  894. db: AsyncSession = Depends(get_db),
  895. _: User | None = RequirePermissionIfAuthEnabled(Permission.QUEUE_UPDATE_OWN),
  896. ):
  897. """Manually start a staged (manual_start) queue item.
  898. Clears the manual_start flag so the scheduler picks it up. When
  899. ``skip_filament_check`` is false (the default) the live filament
  900. deficit (#1496) is checked first — if the assigned spool can't satisfy
  901. a slot's required grams, the route returns ``409`` with the deficit
  902. payload so the caller can show a confirm dialog and retry with
  903. ``skip_filament_check=true``.
  904. """
  905. result = await db.execute(
  906. select(PrintQueueItem)
  907. .options(
  908. selectinload(PrintQueueItem.archive),
  909. selectinload(PrintQueueItem.printer),
  910. selectinload(PrintQueueItem.library_file),
  911. selectinload(PrintQueueItem.batch),
  912. )
  913. .where(PrintQueueItem.id == item_id)
  914. )
  915. item = result.scalar_one_or_none()
  916. if not item:
  917. raise HTTPException(404, "Queue item not found")
  918. if item.status != "pending":
  919. raise HTTPException(400, f"Can only start pending items, current status: '{item.status}'")
  920. # Live deficit check — re-evaluated against current spool state, so a
  921. # spool swap between scheduler flagging and the user clicking ▶ clears
  922. # the block automatically.
  923. if not skip_filament_check:
  924. deficit = await compute_deficit_for_queue_item(db, item)
  925. if deficit:
  926. raise HTTPException(
  927. status_code=409,
  928. detail={
  929. "code": "insufficient_filament",
  930. "deficit": [d.to_dict() for d in deficit],
  931. },
  932. )
  933. # Print Anyway / no deficit: clear the flags and let the scheduler dispatch.
  934. item.manual_start = False
  935. item.filament_short = False
  936. await db.commit()
  937. await db.refresh(item, ["archive", "printer", "library_file", "created_by", "batch"])
  938. logger.info(
  939. "Manually started queue item %s (cleared manual_start; skip_filament_check=%s)",
  940. item_id,
  941. skip_filament_check,
  942. )
  943. return _enrich_response(item)