| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318 |
- """API routes for File Manager (Library) functionality."""
- import base64
- import binascii
- import hashlib
- import logging
- import os
- import re
- import shutil
- import uuid
- import zipfile
- from pathlib import Path
- from fastapi import APIRouter, Depends, File, HTTPException, Query, Response, UploadFile
- from fastapi.responses import FileResponse as FastAPIFileResponse
- from sqlalchemy import func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import selectinload
- from backend.app.core.auth import (
- require_ownership_permission,
- require_permission_if_auth_enabled,
- )
- from backend.app.core.config import settings as app_settings
- from backend.app.core.database import get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.archive import PrintArchive
- from backend.app.models.library import LibraryFile, LibraryFolder
- from backend.app.models.print_queue import PrintQueueItem
- from backend.app.models.project import Project
- from backend.app.models.user import User
- from backend.app.schemas.library import (
- AddToQueueError,
- AddToQueueRequest,
- AddToQueueResponse,
- AddToQueueResult,
- BatchThumbnailRequest,
- BatchThumbnailResponse,
- BatchThumbnailResult,
- BulkDeleteRequest,
- BulkDeleteResponse,
- FileDuplicate,
- FileListResponse,
- FileMoveRequest,
- FilePrintRequest,
- FileResponse as FileResponseSchema,
- FileUpdate,
- FileUploadResponse,
- FolderCreate,
- FolderResponse,
- FolderTreeItem,
- FolderUpdate,
- ZipExtractError,
- ZipExtractResponse,
- ZipExtractResult,
- )
- from backend.app.services.archive import ThreeMFParser
- from backend.app.services.stl_thumbnail import generate_stl_thumbnail
- from backend.app.utils.threemf_tools import extract_nozzle_mapping_from_3mf
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/library", tags=["library"])
- def get_library_dir() -> Path:
- """Get the library storage directory."""
- base_dir = Path(app_settings.archive_dir)
- library_dir = base_dir / "library"
- library_dir.mkdir(parents=True, exist_ok=True)
- return library_dir
- def get_library_files_dir() -> Path:
- """Get the directory for library files."""
- files_dir = get_library_dir() / "files"
- files_dir.mkdir(parents=True, exist_ok=True)
- return files_dir
- def get_library_thumbnails_dir() -> Path:
- """Get the directory for library thumbnails."""
- thumbnails_dir = get_library_dir() / "thumbnails"
- thumbnails_dir.mkdir(parents=True, exist_ok=True)
- return thumbnails_dir
- def to_relative_path(absolute_path: Path | str) -> str:
- """Convert an absolute path to a path relative to base_dir for storage."""
- if not absolute_path:
- return ""
- abs_path = Path(absolute_path)
- base_dir = Path(app_settings.base_dir)
- try:
- return str(abs_path.relative_to(base_dir))
- except ValueError:
- # Path is not under base_dir, return as-is (shouldn't happen normally)
- return str(abs_path)
- def to_absolute_path(relative_path: str | None) -> Path | None:
- """Convert a relative path (from database) to an absolute path for file operations."""
- if not relative_path:
- return None
- # Handle already-absolute paths (for backwards compatibility during migration)
- path = Path(relative_path)
- if path.is_absolute():
- return path
- return Path(app_settings.base_dir) / relative_path
- def calculate_file_hash(file_path: Path) -> str:
- """Calculate SHA256 hash of a file."""
- sha256_hash = hashlib.sha256()
- with open(file_path, "rb") as f:
- for byte_block in iter(lambda: f.read(4096), b""):
- sha256_hash.update(byte_block)
- return sha256_hash.hexdigest()
- def extract_gcode_thumbnail(file_path: Path) -> bytes | None:
- """Extract embedded thumbnail from gcode file.
- Supports PrusaSlicer/BambuStudio format:
- ; thumbnail begin WxH SIZE
- ; base64data...
- ; thumbnail end
- """
- try:
- thumbnail_data = None
- in_thumbnail = False
- thumbnail_lines = []
- best_size = 0
- with open(file_path, errors="ignore") as f:
- # Only read first 50KB for performance (thumbnails are at the start)
- content = f.read(50000)
- for line in content.split("\n"):
- line = line.strip()
- # Check for thumbnail start
- if line.startswith("; thumbnail begin"):
- in_thumbnail = True
- thumbnail_lines = []
- # Parse dimensions: "; thumbnail begin 300x300 12345"
- match = re.search(r"(\d+)x(\d+)", line)
- if match:
- width = int(match.group(1))
- # Prefer larger thumbnails (up to 300px)
- if width > best_size and width <= 300:
- best_size = width
- continue
- # Check for thumbnail end
- if line.startswith("; thumbnail end"):
- if in_thumbnail and thumbnail_lines:
- try:
- # Decode the base64 data
- b64_data = "".join(thumbnail_lines)
- decoded = base64.b64decode(b64_data)
- # Only keep if this is the best size or first valid thumbnail
- if thumbnail_data is None or best_size > 0:
- thumbnail_data = decoded
- except (binascii.Error, ValueError):
- pass # Skip thumbnail with invalid base64 data
- in_thumbnail = False
- thumbnail_lines = []
- continue
- # Collect thumbnail data
- if in_thumbnail and line.startswith(";"):
- # Remove the leading "; " or ";"
- data_line = line[1:].strip()
- if data_line:
- thumbnail_lines.append(data_line)
- return thumbnail_data
- except Exception as e:
- logger.warning("Failed to extract gcode thumbnail: %s", e)
- return None
- def create_image_thumbnail(file_path: Path, thumbnails_dir: Path, max_size: int = 256) -> str | None:
- """Create a thumbnail from an image file.
- For small images, copies directly. For larger images, resizes.
- Returns the thumbnail path or None on failure.
- """
- try:
- from PIL import Image
- thumb_filename = f"{uuid.uuid4().hex}.png"
- thumb_path = thumbnails_dir / thumb_filename
- with Image.open(file_path) as img:
- # Convert to RGB if necessary (for PNG with transparency, etc.)
- if img.mode in ("RGBA", "LA", "P"):
- # Create white background for transparency
- background = Image.new("RGB", img.size, (255, 255, 255))
- if img.mode == "P":
- img = img.convert("RGBA")
- background.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
- img = background
- elif img.mode != "RGB":
- img = img.convert("RGB")
- # Resize if larger than max_size
- if img.width > max_size or img.height > max_size:
- img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
- img.save(thumb_path, "PNG", optimize=True)
- return str(thumb_path)
- except ImportError:
- # PIL not installed, just copy the file if it's small enough
- logger.warning("PIL not installed, copying image as thumbnail")
- try:
- file_size = file_path.stat().st_size
- if file_size < 500000: # Less than 500KB
- thumb_filename = f"{uuid.uuid4().hex}{file_path.suffix}"
- thumb_path = thumbnails_dir / thumb_filename
- shutil.copy2(file_path, thumb_path)
- return str(thumb_path)
- except OSError:
- pass # File inaccessible; fall through to return None
- return None
- except Exception as e:
- logger.warning("Failed to create image thumbnail: %s", e)
- return None
- # Supported image extensions for thumbnails
- IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".tiff", ".tif"}
- # ============ Folder Endpoints ============
- @router.get("/folders", response_model=list[FolderTreeItem])
- @router.get("/folders/", response_model=list[FolderTreeItem])
- async def list_folders(
- response: Response,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get all folders as a tree structure."""
- # Prevent browser caching of folder list
- response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
- # Get all folders with project and archive joins
- result = await db.execute(
- select(LibraryFolder, Project.name, PrintArchive.print_name)
- .outerjoin(Project, LibraryFolder.project_id == Project.id)
- .outerjoin(PrintArchive, LibraryFolder.archive_id == PrintArchive.id)
- .order_by(LibraryFolder.name)
- )
- rows = result.all()
- # Get file counts per folder
- file_counts_result = await db.execute(
- select(LibraryFile.folder_id, func.count(LibraryFile.id))
- .where(LibraryFile.folder_id.isnot(None))
- .group_by(LibraryFile.folder_id)
- )
- file_counts = dict(file_counts_result.all())
- # Build tree structure
- folder_map = {}
- root_folders = []
- for folder, project_name, archive_name in rows:
- folder_item = FolderTreeItem(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=archive_name,
- file_count=file_counts.get(folder.id, 0),
- children=[],
- )
- folder_map[folder.id] = folder_item
- # Link children to parents
- for folder, _, _ in rows:
- folder_item = folder_map[folder.id]
- if folder.parent_id is None:
- root_folders.append(folder_item)
- elif folder.parent_id in folder_map:
- folder_map[folder.parent_id].children.append(folder_item)
- return root_folders
- @router.get("/folders/by-project/{project_id}", response_model=list[FolderResponse])
- async def get_folders_by_project(
- project_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get all folders linked to a specific project."""
- result = await db.execute(
- select(LibraryFolder, Project.name)
- .outerjoin(Project, LibraryFolder.project_id == Project.id)
- .where(LibraryFolder.project_id == project_id)
- .order_by(LibraryFolder.name)
- )
- rows = result.all()
- folders = []
- for folder, project_name in rows:
- # Get file count
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(LibraryFile.folder_id == folder.id)
- )
- file_count = file_count_result.scalar() or 0
- folders.append(
- FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=None,
- file_count=file_count,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- )
- return folders
- @router.get("/folders/by-archive/{archive_id}", response_model=list[FolderResponse])
- async def get_folders_by_archive(
- archive_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get all folders linked to a specific archive."""
- result = await db.execute(
- select(LibraryFolder, PrintArchive.print_name)
- .outerjoin(PrintArchive, LibraryFolder.archive_id == PrintArchive.id)
- .where(LibraryFolder.archive_id == archive_id)
- .order_by(LibraryFolder.name)
- )
- rows = result.all()
- folders = []
- for folder, archive_name in rows:
- # Get file count
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(LibraryFile.folder_id == folder.id)
- )
- file_count = file_count_result.scalar() or 0
- folders.append(
- FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=None,
- archive_name=archive_name,
- file_count=file_count,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- )
- return folders
- @router.post("/folders", response_model=FolderResponse)
- @router.post("/folders/", response_model=FolderResponse)
- async def create_folder(
- data: FolderCreate,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Create a new folder."""
- # Verify parent exists if specified
- if data.parent_id is not None:
- parent_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == data.parent_id))
- if not parent_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Parent folder not found")
- # Verify project exists if specified
- project_name = None
- if data.project_id is not None:
- project_result = await db.execute(select(Project).where(Project.id == data.project_id))
- project = project_result.scalar_one_or_none()
- if not project:
- raise HTTPException(status_code=404, detail="Project not found")
- project_name = project.name
- # Verify archive exists if specified
- archive_name = None
- if data.archive_id is not None:
- archive_result = await db.execute(select(PrintArchive).where(PrintArchive.id == data.archive_id))
- archive = archive_result.scalar_one_or_none()
- if not archive:
- raise HTTPException(status_code=404, detail="Archive not found")
- archive_name = archive.print_name
- folder = LibraryFolder(
- name=data.name,
- parent_id=data.parent_id,
- project_id=data.project_id,
- archive_id=data.archive_id,
- )
- db.add(folder)
- await db.commit()
- await db.refresh(folder)
- return FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=archive_name,
- file_count=0,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- @router.get("/folders/{folder_id}", response_model=FolderResponse)
- async def get_folder(
- folder_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get a folder by ID."""
- result = await db.execute(
- select(LibraryFolder, Project.name, PrintArchive.print_name)
- .outerjoin(Project, LibraryFolder.project_id == Project.id)
- .outerjoin(PrintArchive, LibraryFolder.archive_id == PrintArchive.id)
- .where(LibraryFolder.id == folder_id)
- )
- row = result.one_or_none()
- if not row:
- raise HTTPException(status_code=404, detail="Folder not found")
- folder, project_name, archive_name = row
- # Get file count
- file_count_result = await db.execute(select(func.count(LibraryFile.id)).where(LibraryFile.folder_id == folder_id))
- file_count = file_count_result.scalar() or 0
- return FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=archive_name,
- file_count=file_count,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- @router.put("/folders/{folder_id}", response_model=FolderResponse)
- async def update_folder(
- folder_id: int,
- data: FolderUpdate,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPDATE_ALL)),
- ):
- """Update a folder.
- Note: Folders require library:update_all permission since they don't have
- ownership tracking.
- """
- result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- folder = result.scalar_one_or_none()
- if not folder:
- raise HTTPException(status_code=404, detail="Folder not found")
- if data.name is not None:
- folder.name = data.name
- if data.parent_id is not None:
- # Prevent circular reference
- if data.parent_id == folder_id:
- raise HTTPException(status_code=400, detail="Folder cannot be its own parent")
- # Check for circular reference in ancestors
- if data.parent_id != 0: # 0 means move to root
- current_id = data.parent_id
- while current_id is not None:
- if current_id == folder_id:
- raise HTTPException(status_code=400, detail="Cannot move folder into its own subtree")
- parent_result = await db.execute(select(LibraryFolder.parent_id).where(LibraryFolder.id == current_id))
- current_id = parent_result.scalar()
- folder.parent_id = data.parent_id
- else:
- folder.parent_id = None
- # Update project_id (0 to unlink)
- if data.project_id is not None:
- if data.project_id == 0:
- folder.project_id = None
- else:
- # Verify project exists
- project_result = await db.execute(select(Project).where(Project.id == data.project_id))
- if not project_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Project not found")
- folder.project_id = data.project_id
- # Update archive_id (0 to unlink)
- if data.archive_id is not None:
- if data.archive_id == 0:
- folder.archive_id = None
- else:
- # Verify archive exists
- archive_result = await db.execute(select(PrintArchive).where(PrintArchive.id == data.archive_id))
- if not archive_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Archive not found")
- folder.archive_id = data.archive_id
- await db.commit()
- await db.refresh(folder)
- # Get file count and names
- file_count_result = await db.execute(select(func.count(LibraryFile.id)).where(LibraryFile.folder_id == folder_id))
- file_count = file_count_result.scalar() or 0
- # Get project and archive names
- project_name = None
- archive_name = None
- if folder.project_id:
- project_result = await db.execute(select(Project.name).where(Project.id == folder.project_id))
- project_name = project_result.scalar()
- if folder.archive_id:
- archive_result = await db.execute(select(PrintArchive.print_name).where(PrintArchive.id == folder.archive_id))
- archive_name = archive_result.scalar()
- return FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=archive_name,
- file_count=file_count,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- @router.delete("/folders/{folder_id}")
- async def delete_folder(
- folder_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_DELETE_ALL)),
- ):
- """Delete a folder and all its contents (cascade).
- Note: Folders require library:delete_all permission since they don't have
- ownership tracking.
- """
- result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- folder = result.scalar_one_or_none()
- if not folder:
- raise HTTPException(status_code=404, detail="Folder not found")
- # Get all files in this folder and subfolders to delete from disk
- async def get_all_file_ids(fid: int) -> list[int]:
- """Recursively get all file IDs in a folder tree."""
- file_ids = []
- # Get files in this folder
- files_result = await db.execute(
- select(LibraryFile.id, LibraryFile.file_path, LibraryFile.thumbnail_path).where(
- LibraryFile.folder_id == fid
- )
- )
- for file_id, file_path, thumb_path in files_result.all():
- file_ids.append(file_id)
- # Delete actual files
- try:
- if file_path and os.path.exists(file_path):
- os.remove(file_path)
- if thumb_path and os.path.exists(thumb_path):
- os.remove(thumb_path)
- except OSError as e:
- logger.warning("Failed to delete file: %s", e)
- # Get child folders and recurse
- children_result = await db.execute(select(LibraryFolder.id).where(LibraryFolder.parent_id == fid))
- for (child_id,) in children_result.all():
- file_ids.extend(await get_all_file_ids(child_id))
- return file_ids
- await get_all_file_ids(folder_id)
- # Delete folder (cascade will handle files and subfolders)
- await db.delete(folder)
- return {"status": "success", "message": "Folder deleted"}
- # ============ File Endpoints ============
- @router.get("/files", response_model=list[FileListResponse])
- @router.get("/files/", response_model=list[FileListResponse])
- async def list_files(
- response: Response,
- folder_id: int | None = None,
- include_root: bool = True,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """List files, optionally filtered by folder.
- Args:
- folder_id: Filter by folder ID. If None and include_root=True, returns root files.
- include_root: If True and folder_id is None, returns files at root level.
- If False and folder_id is None, returns all files.
- """
- query = select(LibraryFile).options(selectinload(LibraryFile.created_by))
- if folder_id is not None:
- query = query.where(LibraryFile.folder_id == folder_id)
- elif include_root:
- query = query.where(LibraryFile.folder_id.is_(None))
- query = query.order_by(LibraryFile.filename)
- result = await db.execute(query)
- files = result.scalars().all()
- # Get duplicate counts
- hash_counts = {}
- if files:
- hashes = [f.file_hash for f in files if f.file_hash]
- if hashes:
- dup_result = await db.execute(
- select(LibraryFile.file_hash, func.count(LibraryFile.id))
- .where(LibraryFile.file_hash.in_(hashes))
- .group_by(LibraryFile.file_hash)
- )
- hash_counts = {h: c - 1 for h, c in dup_result.all()} # -1 to exclude self
- # Prevent browser caching of file list
- response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
- file_list = []
- for f in files:
- # Extract key metadata for display
- print_name = None
- print_time = None
- filament_grams = None
- sliced_for_model = None
- if f.file_metadata:
- print_name = f.file_metadata.get("print_name")
- print_time = f.file_metadata.get("print_time_seconds")
- filament_grams = f.file_metadata.get("filament_used_grams")
- sliced_for_model = f.file_metadata.get("sliced_for_model")
- file_list.append(
- FileListResponse(
- id=f.id,
- folder_id=f.folder_id,
- filename=f.filename,
- file_type=f.file_type,
- file_size=f.file_size,
- thumbnail_path=f.thumbnail_path,
- print_count=f.print_count,
- duplicate_count=hash_counts.get(f.file_hash, 0) if f.file_hash else 0,
- created_by_id=f.created_by_id,
- created_by_username=f.created_by.username if f.created_by else None,
- created_at=f.created_at,
- print_name=print_name,
- print_time_seconds=print_time,
- filament_used_grams=filament_grams,
- sliced_for_model=sliced_for_model,
- )
- )
- return file_list
- @router.post("/files", response_model=FileUploadResponse)
- @router.post("/files/", response_model=FileUploadResponse)
- async def upload_file(
- file: UploadFile = File(...),
- folder_id: int | None = None,
- generate_stl_thumbnails: bool = Query(default=True),
- db: AsyncSession = Depends(get_db),
- current_user: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Upload a file to the library."""
- try:
- if not file.filename:
- raise HTTPException(status_code=400, detail="Filename is required")
- filename = file.filename
- ext = os.path.splitext(filename)[1].lower()
- # Handle files without extension
- file_type = ext[1:] if ext else "unknown"
- # Verify folder exists if specified
- if folder_id is not None:
- folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- if not folder_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Folder not found")
- # Generate unique filename for storage
- unique_filename = f"{uuid.uuid4().hex}{ext}"
- file_path = get_library_files_dir() / unique_filename
- # Save file
- content = await file.read()
- with open(file_path, "wb") as f:
- f.write(content)
- # Calculate hash
- file_hash = calculate_file_hash(file_path)
- # Check for duplicates
- dup_result = await db.execute(select(LibraryFile.id).where(LibraryFile.file_hash == file_hash).limit(1))
- duplicate_of = dup_result.scalar()
- # Extract metadata and thumbnail
- metadata = {}
- thumbnail_path = None
- thumbnails_dir = get_library_thumbnails_dir()
- if ext == ".3mf":
- try:
- parser = ThreeMFParser(str(file_path))
- raw_metadata = parser.parse()
- # Extract thumbnail before cleaning metadata
- thumbnail_data = raw_metadata.get("_thumbnail_data")
- thumbnail_ext = raw_metadata.get("_thumbnail_ext", ".png")
- # Save thumbnail if extracted
- if thumbnail_data:
- thumb_filename = f"{uuid.uuid4().hex}{thumbnail_ext}"
- thumb_path = thumbnails_dir / thumb_filename
- with open(thumb_path, "wb") as f:
- f.write(thumbnail_data)
- thumbnail_path = str(thumb_path)
- # Clean metadata - remove non-JSON-serializable data (bytes, etc.)
- def clean_metadata(obj):
- if isinstance(obj, dict):
- return {
- k: clean_metadata(v)
- for k, v in obj.items()
- if not isinstance(v, bytes) and k not in ("_thumbnail_data", "_thumbnail_ext")
- }
- elif isinstance(obj, list):
- return [clean_metadata(i) for i in obj if not isinstance(i, bytes)]
- elif isinstance(obj, bytes):
- return None
- return obj
- metadata = clean_metadata(raw_metadata)
- except Exception as e:
- logger.warning("Failed to parse 3MF: %s", e)
- elif ext == ".gcode":
- # Extract embedded thumbnail from gcode
- try:
- thumbnail_data = extract_gcode_thumbnail(file_path)
- if thumbnail_data:
- thumb_filename = f"{uuid.uuid4().hex}.png"
- thumb_path = thumbnails_dir / thumb_filename
- with open(thumb_path, "wb") as f:
- f.write(thumbnail_data)
- thumbnail_path = str(thumb_path)
- except Exception as e:
- logger.warning("Failed to extract gcode thumbnail: %s", e)
- elif ext.lower() in IMAGE_EXTENSIONS:
- # For image files, create a thumbnail from the image itself
- thumbnail_path = create_image_thumbnail(file_path, thumbnails_dir)
- elif ext == ".stl":
- # Generate STL thumbnail if enabled
- if generate_stl_thumbnails:
- thumbnail_path = generate_stl_thumbnail(file_path, thumbnails_dir)
- # Create database entry (store relative paths for portability)
- library_file = LibraryFile(
- folder_id=folder_id,
- filename=filename,
- file_path=to_relative_path(file_path),
- file_type=file_type,
- file_size=len(content),
- file_hash=file_hash,
- thumbnail_path=to_relative_path(thumbnail_path) if thumbnail_path else None,
- file_metadata=metadata if metadata else None,
- created_by_id=current_user.id if current_user else None,
- )
- db.add(library_file)
- await db.commit()
- await db.refresh(library_file)
- return FileUploadResponse(
- id=library_file.id,
- filename=library_file.filename,
- file_type=library_file.file_type,
- file_size=library_file.file_size,
- thumbnail_path=library_file.thumbnail_path,
- duplicate_of=duplicate_of,
- metadata=library_file.file_metadata,
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error("Upload failed for %s: %s", file.filename, e, exc_info=True)
- raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
- @router.post("/files/extract-zip", response_model=ZipExtractResponse)
- async def extract_zip_file(
- file: UploadFile = File(...),
- folder_id: int | None = Query(default=None),
- preserve_structure: bool = Query(default=True),
- create_folder_from_zip: bool = Query(default=False),
- generate_stl_thumbnails: bool = Query(default=True),
- db: AsyncSession = Depends(get_db),
- current_user: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Upload and extract a ZIP file to the library.
- Args:
- file: The ZIP file to extract
- folder_id: Target folder ID (None = root)
- preserve_structure: If True, recreate folder structure from ZIP; if False, extract all files flat
- create_folder_from_zip: If True, create a folder named after the ZIP file and extract into it
- generate_stl_thumbnails: If True, generate thumbnails for STL files
- """
- import tempfile
- if not file.filename or not file.filename.lower().endswith(".zip"):
- raise HTTPException(status_code=400, detail="Only ZIP files are supported")
- # Verify target folder exists if specified
- if folder_id is not None:
- folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- if not folder_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Target folder not found")
- # Save ZIP to temp file
- try:
- with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp:
- content = await file.read()
- tmp.write(content)
- tmp_path = tmp.name
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Failed to save ZIP file: {str(e)}")
- extracted_files: list[ZipExtractResult] = []
- errors: list[ZipExtractError] = []
- folders_created = 0
- folder_cache: dict[str, int] = {} # path -> folder_id
- # If create_folder_from_zip is True, create a folder named after the ZIP file
- zip_folder_id = folder_id
- logger.info(
- f"ZIP extraction: create_folder_from_zip={create_folder_from_zip}, folder_id={folder_id}, filename={file.filename}"
- )
- if create_folder_from_zip and file.filename:
- # Remove .zip extension to get folder name
- zip_folder_name = file.filename[:-4] if file.filename.lower().endswith(".zip") else file.filename
- # Check if folder already exists
- existing = await db.execute(
- select(LibraryFolder).where(
- LibraryFolder.name == zip_folder_name,
- LibraryFolder.parent_id == folder_id if folder_id else LibraryFolder.parent_id.is_(None),
- )
- )
- existing_folder = existing.scalar_one_or_none()
- if existing_folder:
- zip_folder_id = existing_folder.id
- logger.info("Reusing existing folder '%s' with id=%s", zip_folder_name, zip_folder_id)
- else:
- # Create folder
- new_folder = LibraryFolder(name=zip_folder_name, parent_id=folder_id)
- db.add(new_folder)
- await db.flush()
- await db.commit() # Commit folder creation immediately
- zip_folder_id = new_folder.id
- folders_created += 1
- logger.info("Created new folder '%s' with id=%s", zip_folder_name, zip_folder_id)
- try:
- with zipfile.ZipFile(tmp_path, "r") as zf:
- # Filter out directories and hidden/system files
- file_list = [
- name
- for name in zf.namelist()
- if not name.endswith("/")
- and not name.startswith("__MACOSX")
- and not os.path.basename(name).startswith(".")
- ]
- for zip_path in file_list:
- try:
- # Determine target folder (use zip_folder_id as base if create_folder_from_zip was used)
- target_folder_id = zip_folder_id
- if preserve_structure:
- # Get directory path from ZIP
- dir_path = os.path.dirname(zip_path)
- if dir_path:
- # Create folder structure
- parts = dir_path.split("/")
- current_parent = zip_folder_id
- current_path = ""
- for part in parts:
- if not part:
- continue
- current_path = f"{current_path}/{part}" if current_path else part
- if current_path in folder_cache:
- current_parent = folder_cache[current_path]
- else:
- # Check if folder exists
- existing = await db.execute(
- select(LibraryFolder).where(
- LibraryFolder.name == part,
- LibraryFolder.parent_id == current_parent
- if current_parent
- else LibraryFolder.parent_id.is_(None),
- )
- )
- existing_folder = existing.scalar_one_or_none()
- if existing_folder:
- current_parent = existing_folder.id
- else:
- # Create folder
- new_folder = LibraryFolder(name=part, parent_id=current_parent)
- db.add(new_folder)
- await db.flush()
- current_parent = new_folder.id
- folders_created += 1
- folder_cache[current_path] = current_parent
- target_folder_id = current_parent
- # Extract file
- filename = os.path.basename(zip_path)
- ext = os.path.splitext(filename)[1].lower()
- file_type = ext[1:] if ext else "unknown"
- # Generate unique filename for storage
- unique_filename = f"{uuid.uuid4().hex}{ext}"
- file_path = get_library_files_dir() / unique_filename
- # Extract and save file
- file_content = zf.read(zip_path)
- with open(file_path, "wb") as f:
- f.write(file_content)
- # Calculate hash
- file_hash = calculate_file_hash(file_path)
- # Extract metadata and thumbnail for 3MF files
- metadata = {}
- thumbnail_path = None
- thumbnails_dir = get_library_thumbnails_dir()
- if ext == ".3mf":
- try:
- parser = ThreeMFParser(str(file_path))
- raw_metadata = parser.parse()
- thumbnail_data = raw_metadata.get("_thumbnail_data")
- thumbnail_ext = raw_metadata.get("_thumbnail_ext", ".png")
- if thumbnail_data:
- thumb_filename = f"{uuid.uuid4().hex}{thumbnail_ext}"
- thumb_path = thumbnails_dir / thumb_filename
- with open(thumb_path, "wb") as f:
- f.write(thumbnail_data)
- thumbnail_path = str(thumb_path)
- def clean_metadata(obj):
- if isinstance(obj, dict):
- return {
- k: clean_metadata(v)
- for k, v in obj.items()
- if not isinstance(v, bytes) and k not in ("_thumbnail_data", "_thumbnail_ext")
- }
- elif isinstance(obj, list):
- return [clean_metadata(i) for i in obj if not isinstance(i, bytes)]
- elif isinstance(obj, bytes):
- return None
- return obj
- metadata = clean_metadata(raw_metadata)
- except Exception as e:
- logger.warning("Failed to parse 3MF from ZIP: %s", e)
- elif ext == ".gcode":
- try:
- thumbnail_data = extract_gcode_thumbnail(file_path)
- if thumbnail_data:
- thumb_filename = f"{uuid.uuid4().hex}.png"
- thumb_path = thumbnails_dir / thumb_filename
- with open(thumb_path, "wb") as f:
- f.write(thumbnail_data)
- thumbnail_path = str(thumb_path)
- except Exception as e:
- logger.warning("Failed to extract gcode thumbnail from ZIP: %s", e)
- elif ext.lower() in IMAGE_EXTENSIONS:
- thumbnail_path = create_image_thumbnail(file_path, thumbnails_dir)
- elif ext == ".stl":
- # Generate STL thumbnail if enabled
- if generate_stl_thumbnails:
- thumbnail_path = generate_stl_thumbnail(file_path, thumbnails_dir)
- # Create database entry (store relative paths for portability)
- library_file = LibraryFile(
- folder_id=target_folder_id,
- filename=filename,
- file_path=to_relative_path(file_path),
- file_type=file_type,
- file_size=len(file_content),
- file_hash=file_hash,
- thumbnail_path=to_relative_path(thumbnail_path) if thumbnail_path else None,
- file_metadata=metadata if metadata else None,
- created_by_id=current_user.id if current_user else None,
- )
- db.add(library_file)
- await db.flush()
- await db.refresh(library_file)
- extracted_files.append(
- ZipExtractResult(
- filename=filename,
- file_id=library_file.id,
- folder_id=target_folder_id,
- )
- )
- # Commit after each file to release database lock
- # This prevents long-running transactions from blocking other requests
- await db.commit()
- except Exception as e:
- logger.error("Failed to extract %s: %s", zip_path, e)
- errors.append(ZipExtractError(filename=os.path.basename(zip_path), error=str(e)))
- # Rollback the failed file but continue with others
- await db.rollback()
- return ZipExtractResponse(
- extracted=len(extracted_files),
- folders_created=folders_created,
- files=extracted_files,
- errors=errors,
- )
- except zipfile.BadZipFile:
- raise HTTPException(status_code=400, detail="Invalid or corrupted ZIP file")
- except Exception as e:
- logger.error("ZIP extraction failed: %s", e, exc_info=True)
- raise HTTPException(status_code=500, detail=f"ZIP extraction failed: {str(e)}")
- finally:
- # Clean up temp file
- try:
- os.unlink(tmp_path)
- except OSError:
- pass # Best-effort temp file cleanup; ignore if already removed
- # ============ STL Thumbnail Batch Generation ============
- @router.post("/generate-stl-thumbnails", response_model=BatchThumbnailResponse)
- async def batch_generate_stl_thumbnails(
- request: BatchThumbnailRequest,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPDATE_ALL)),
- ):
- """Generate thumbnails for STL files in batch.
- Note: Requires library:update_all permission since this is a batch operation
- that may affect files owned by different users.
- Can generate thumbnails for:
- - Specific file IDs (file_ids)
- - All STL files in a folder (folder_id)
- - All STL files missing thumbnails (all_missing=True)
- """
- thumbnails_dir = get_library_thumbnails_dir()
- results: list[BatchThumbnailResult] = []
- # Build query based on request
- query = select(LibraryFile).where(LibraryFile.file_type == "stl")
- if request.file_ids:
- # Specific files
- query = query.where(LibraryFile.id.in_(request.file_ids))
- elif request.folder_id is not None:
- # All STL files in a specific folder
- query = query.where(LibraryFile.folder_id == request.folder_id)
- if not request.all_missing:
- # If not specifically asking for missing thumbnails, get all
- pass
- else:
- query = query.where(LibraryFile.thumbnail_path.is_(None))
- elif request.all_missing:
- # All STL files without thumbnails
- query = query.where(LibraryFile.thumbnail_path.is_(None))
- else:
- # No criteria specified - return empty
- return BatchThumbnailResponse(
- processed=0,
- succeeded=0,
- failed=0,
- results=[],
- )
- result = await db.execute(query)
- stl_files = result.scalars().all()
- succeeded = 0
- failed = 0
- for stl_file in stl_files:
- file_path = to_absolute_path(stl_file.file_path)
- if not file_path or not file_path.exists():
- results.append(
- BatchThumbnailResult(
- file_id=stl_file.id,
- filename=stl_file.filename,
- success=False,
- error="File not found on disk",
- )
- )
- failed += 1
- continue
- try:
- thumbnail_path = generate_stl_thumbnail(file_path, thumbnails_dir)
- if thumbnail_path:
- # Update database with relative path
- stl_file.thumbnail_path = to_relative_path(thumbnail_path)
- await db.flush()
- results.append(
- BatchThumbnailResult(
- file_id=stl_file.id,
- filename=stl_file.filename,
- success=True,
- )
- )
- succeeded += 1
- else:
- results.append(
- BatchThumbnailResult(
- file_id=stl_file.id,
- filename=stl_file.filename,
- success=False,
- error="Thumbnail generation failed",
- )
- )
- failed += 1
- except Exception as e:
- logger.error("Failed to generate thumbnail for %s: %s", stl_file.filename, e)
- results.append(
- BatchThumbnailResult(
- file_id=stl_file.id,
- filename=stl_file.filename,
- success=False,
- error=str(e),
- )
- )
- failed += 1
- await db.commit()
- return BatchThumbnailResponse(
- processed=len(stl_files),
- succeeded=succeeded,
- failed=failed,
- results=results,
- )
- # ============ Queue Operations ============
- # NOTE: These routes must be defined BEFORE /files/{file_id} to avoid path parameter conflicts
- def is_sliced_file(filename: str) -> bool:
- """Check if a file is a sliced (printable) file.
- Sliced files are:
- - .gcode files
- - .3mf files that contain '.gcode.' in the name (e.g., filename.gcode.3mf)
- """
- lower = filename.lower()
- return lower.endswith(".gcode") or ".gcode." in lower
- @router.post("/files/add-to-queue", response_model=AddToQueueResponse)
- async def add_files_to_queue(
- request: AddToQueueRequest,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.QUEUE_CREATE)),
- ):
- """Add library files to the print queue.
- Only sliced files (.gcode or .gcode.3mf) can be added to the queue.
- The archive will be created automatically when the print starts.
- """
- added: list[AddToQueueResult] = []
- errors: list[AddToQueueError] = []
- # Get all requested files
- result = await db.execute(select(LibraryFile).where(LibraryFile.id.in_(request.file_ids)))
- files = {f.id: f for f in result.scalars().all()}
- # Get max position for queue ordering
- pos_result = await db.execute(select(func.coalesce(func.max(PrintQueueItem.position), 0)))
- max_position = pos_result.scalar() or 0
- for file_id in request.file_ids:
- lib_file = files.get(file_id)
- if not lib_file:
- errors.append(AddToQueueError(file_id=file_id, filename="(not found)", error="File not found"))
- continue
- # Validate file is sliced
- if not is_sliced_file(lib_file.filename):
- errors.append(
- AddToQueueError(
- file_id=file_id,
- filename=lib_file.filename,
- error="Not a sliced file. Only .gcode or .gcode.3mf files can be printed.",
- )
- )
- continue
- try:
- # Verify file exists on disk
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- errors.append(
- AddToQueueError(file_id=file_id, filename=lib_file.filename, error="File not found on disk")
- )
- continue
- # Create queue item referencing library file (archive created at print start)
- max_position += 1
- queue_item = PrintQueueItem(
- printer_id=None, # Unassigned
- library_file_id=file_id,
- position=max_position,
- status="pending",
- )
- db.add(queue_item)
- await db.flush() # Get queue_item.id
- added.append(
- AddToQueueResult(
- file_id=file_id,
- filename=lib_file.filename,
- queue_item_id=queue_item.id,
- )
- )
- except Exception as e:
- logger.exception("Error adding file %s to queue", file_id)
- errors.append(AddToQueueError(file_id=file_id, filename=lib_file.filename, error=str(e)))
- await db.commit()
- return AddToQueueResponse(added=added, errors=errors)
- @router.get("/files/{file_id}/plates")
- async def get_library_file_plates(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get available plates from a multi-plate 3MF library file.
- Returns a list of plates with their index, name, thumbnail availability,
- and filament requirements. For single-plate exports, returns a single plate.
- """
- import json
- import defusedxml.ElementTree as ET
- # Get the library file
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- lib_file = result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- # Only 3MF files have plates
- if not lib_file.filename.lower().endswith(".3mf"):
- return {"file_id": file_id, "filename": lib_file.filename, "plates": [], "is_multi_plate": False}
- plates = []
- try:
- with zipfile.ZipFile(file_path, "r") as zf:
- namelist = zf.namelist()
- # Find all plate gcode files to determine available plates
- gcode_files = [n for n in namelist if n.startswith("Metadata/plate_") and n.endswith(".gcode")]
- # If no gcode is present (source-only or unsliced), fall back to plate JSON/PNG
- plate_indices: list[int] = []
- if gcode_files:
- # Extract plate indices from gcode filenames
- for gf in gcode_files:
- try:
- plate_str = gf[15:-6] # Remove "Metadata/plate_" and ".gcode"
- plate_indices.append(int(plate_str))
- except ValueError:
- pass # Skip gcode file with non-numeric plate index
- else:
- plate_json_files = [n for n in namelist if n.startswith("Metadata/plate_") and n.endswith(".json")]
- plate_png_files = [
- n
- for n in namelist
- if n.startswith("Metadata/plate_")
- and n.endswith(".png")
- and "_small" not in n
- and "no_light" not in n
- ]
- plate_name_candidates = plate_json_files + plate_png_files
- plate_re = re.compile(r"^Metadata/plate_(\d+)\.(json|png)$")
- seen_indices: set[int] = set()
- for name in plate_name_candidates:
- match = plate_re.match(name)
- if match:
- try:
- index = int(match.group(1))
- except ValueError:
- continue
- if index in seen_indices:
- continue
- seen_indices.add(index)
- plate_indices.append(index)
- if not plate_indices:
- # No plate metadata found
- return {"file_id": file_id, "filename": lib_file.filename, "plates": [], "is_multi_plate": False}
- plate_indices.sort()
- # Parse model_settings.config for plate names + object assignments
- plate_names = {}
- plate_object_ids: dict[int, list[str]] = {}
- object_names_by_id: dict[str, str] = {}
- if "Metadata/model_settings.config" in namelist:
- try:
- model_content = zf.read("Metadata/model_settings.config").decode()
- model_root = ET.fromstring(model_content)
- for obj_elem in model_root.findall(".//object"):
- obj_id = obj_elem.get("id")
- if not obj_id:
- continue
- name_meta = obj_elem.find("metadata[@key='name']")
- obj_name = name_meta.get("value") if name_meta is not None else None
- if obj_name:
- object_names_by_id[obj_id] = obj_name
- for plate_elem in model_root.findall(".//plate"):
- plater_id = None
- plater_name = None
- for meta in plate_elem.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "plater_id" and value:
- try:
- plater_id = int(value)
- except ValueError:
- pass # Ignore plate with non-numeric plater_id
- elif key == "plater_name" and value:
- plater_name = value.strip()
- if plater_id is not None and plater_name:
- plate_names[plater_id] = plater_name
- if plater_id is not None:
- for instance_elem in plate_elem.findall("model_instance"):
- for inst_meta in instance_elem.findall("metadata"):
- if inst_meta.get("key") == "object_id":
- obj_id = inst_meta.get("value")
- if not obj_id:
- continue
- plate_object_ids.setdefault(plater_id, [])
- if obj_id not in plate_object_ids[plater_id]:
- plate_object_ids[plater_id].append(obj_id)
- except Exception:
- pass # model_settings.config is optional; skip if missing or malformed
- # Parse slice_info.config for plate metadata
- plate_metadata = {}
- if "Metadata/slice_info.config" in namelist:
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- for plate_elem in root.findall(".//plate"):
- plate_info = {"filaments": [], "prediction": None, "weight": None, "name": None, "objects": []}
- plate_index = None
- for meta in plate_elem.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "index" and value:
- try:
- plate_index = int(value)
- except ValueError:
- pass # Ignore plate with non-numeric index
- elif key == "prediction" and value:
- try:
- plate_info["prediction"] = int(value)
- except ValueError:
- pass # Leave prediction as None if not a valid integer
- elif key == "weight" and value:
- try:
- plate_info["weight"] = float(value)
- except ValueError:
- pass # Leave weight as None if not a valid number
- # Get filaments used in this plate
- for filament_elem in plate_elem.findall("filament"):
- filament_id = filament_elem.get("id")
- filament_type = filament_elem.get("type", "")
- filament_color = filament_elem.get("color", "")
- used_g = filament_elem.get("used_g", "0")
- used_m = filament_elem.get("used_m", "0")
- try:
- used_grams = float(used_g)
- except (ValueError, TypeError):
- used_grams = 0
- if used_grams > 0 and filament_id:
- plate_info["filaments"].append(
- {
- "slot_id": int(filament_id),
- "type": filament_type,
- "color": filament_color,
- "used_grams": round(used_grams, 1),
- "used_meters": float(used_m) if used_m else 0,
- }
- )
- plate_info["filaments"].sort(key=lambda x: x["slot_id"])
- # Collect object names
- for obj_elem in plate_elem.findall("object"):
- obj_name = obj_elem.get("name")
- if obj_name and obj_name not in plate_info["objects"]:
- plate_info["objects"].append(obj_name)
- # Set plate name
- if plate_index is not None:
- custom_name = plate_names.get(plate_index)
- if custom_name:
- plate_info["name"] = custom_name
- elif plate_info["objects"]:
- plate_info["name"] = plate_info["objects"][0]
- plate_metadata[plate_index] = plate_info
- # Parse plate_*.json for object lists when slice_info is missing
- plate_json_objects: dict[int, list[str]] = {}
- for name in namelist:
- match = re.match(r"^Metadata/plate_(\d+)\.json$", name)
- if not match:
- continue
- try:
- plate_index = int(match.group(1))
- except ValueError:
- continue
- try:
- payload = json.loads(zf.read(name).decode())
- bbox_objects = payload.get("bbox_objects", [])
- names: list[str] = []
- for obj in bbox_objects:
- obj_name = obj.get("name") if isinstance(obj, dict) else None
- if obj_name and obj_name not in names:
- names.append(obj_name)
- if names:
- plate_json_objects[plate_index] = names
- except Exception:
- continue
- # Build plate list
- for idx in plate_indices:
- meta = plate_metadata.get(idx, {})
- has_thumbnail = f"Metadata/plate_{idx}.png" in namelist
- objects = meta.get("objects", [])
- if not objects:
- objects = plate_json_objects.get(idx, [])
- if not objects and plate_object_ids.get(idx):
- objects = [
- object_names_by_id.get(obj_id, f"Object {obj_id}") for obj_id in plate_object_ids.get(idx, [])
- ]
- plate_name = meta.get("name")
- if not plate_name:
- plate_name = plate_names.get(idx)
- if not plate_name and objects:
- plate_name = objects[0]
- plates.append(
- {
- "index": idx,
- "name": plate_name,
- "objects": objects,
- "object_count": len(objects),
- "has_thumbnail": has_thumbnail,
- "thumbnail_url": f"/api/v1/library/files/{file_id}/plate-thumbnail/{idx}"
- if has_thumbnail
- else None,
- "print_time_seconds": meta.get("prediction"),
- "filament_used_grams": meta.get("weight"),
- "filaments": meta.get("filaments", []),
- }
- )
- except Exception as e:
- logger.warning("Failed to parse plates from library file %s: %s", file_id, e)
- return {
- "file_id": file_id,
- "filename": lib_file.filename,
- "plates": plates,
- "is_multi_plate": len(plates) > 1,
- }
- @router.get("/files/{file_id}/plate-thumbnail/{plate_index}")
- async def get_library_file_plate_thumbnail(
- file_id: int,
- plate_index: int,
- db: AsyncSession = Depends(get_db),
- ):
- """Get the thumbnail image for a specific plate from a library file."""
- from starlette.responses import Response
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- lib_file = result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- try:
- with zipfile.ZipFile(file_path, "r") as zf:
- thumb_path = f"Metadata/plate_{plate_index}.png"
- if thumb_path in zf.namelist():
- data = zf.read(thumb_path)
- return Response(content=data, media_type="image/png")
- except Exception:
- pass # Archive unreadable or thumbnail missing; fall through to 404
- raise HTTPException(status_code=404, detail=f"Thumbnail for plate {plate_index} not found")
- @router.get("/files/{file_id}/filament-requirements")
- async def get_library_file_filament_requirements(
- file_id: int,
- plate_id: int | None = None,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get filament requirements from a library file.
- Parses the 3MF file to extract filament slot IDs, types, colors, and usage.
- This enables AMS slot assignment when printing from the file manager.
- Args:
- file_id: The library file ID
- plate_id: Optional plate index to get filaments for a specific plate
- """
- import defusedxml.ElementTree as ET
- # Get the library file
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- lib_file = result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- # Get the full file path
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- # Only 3MF files have parseable filament info
- if not lib_file.filename.lower().endswith(".3mf"):
- return {"file_id": file_id, "filename": lib_file.filename, "plate_id": plate_id, "filaments": []}
- filaments = []
- try:
- with zipfile.ZipFile(file_path, "r") as zf:
- # Parse slice_info.config for filament requirements
- if "Metadata/slice_info.config" in zf.namelist():
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- if plate_id is not None:
- # Find filaments for specific plate
- for plate_elem in root.findall(".//plate"):
- # Check if this is the requested plate
- plate_index = None
- for meta in plate_elem.findall("metadata"):
- if meta.get("key") == "index":
- try:
- plate_index = int(meta.get("value", ""))
- except ValueError:
- pass # Skip plate with non-numeric index value
- break
- if plate_index == plate_id:
- # Extract filaments from this plate
- for filament_elem in plate_elem.findall("filament"):
- filament_id = filament_elem.get("id")
- filament_type = filament_elem.get("type", "")
- filament_color = filament_elem.get("color", "")
- used_g = filament_elem.get("used_g", "0")
- used_m = filament_elem.get("used_m", "0")
- tray_info_idx = filament_elem.get("tray_info_idx", "")
- try:
- used_grams = float(used_g)
- except (ValueError, TypeError):
- used_grams = 0
- if used_grams > 0 and filament_id:
- filaments.append(
- {
- "slot_id": int(filament_id),
- "type": filament_type,
- "color": filament_color,
- "used_grams": round(used_grams, 1),
- "used_meters": float(used_m) if used_m else 0,
- "tray_info_idx": tray_info_idx,
- }
- )
- break
- else:
- # Extract all filaments with used_g > 0 (for single-plate or overview)
- for filament_elem in root.findall(".//filament"):
- filament_id = filament_elem.get("id")
- filament_type = filament_elem.get("type", "")
- filament_color = filament_elem.get("color", "")
- used_g = filament_elem.get("used_g", "0")
- used_m = filament_elem.get("used_m", "0")
- tray_info_idx = filament_elem.get("tray_info_idx", "")
- try:
- used_grams = float(used_g)
- except (ValueError, TypeError):
- used_grams = 0
- if used_grams > 0 and filament_id:
- filaments.append(
- {
- "slot_id": int(filament_id),
- "type": filament_type,
- "color": filament_color,
- "used_grams": round(used_grams, 1),
- "used_meters": float(used_m) if used_m else 0,
- "tray_info_idx": tray_info_idx,
- }
- )
- # Sort by slot ID
- filaments.sort(key=lambda x: x["slot_id"])
- # Enrich with nozzle mapping for dual-nozzle printers
- nozzle_mapping = extract_nozzle_mapping_from_3mf(zf)
- if nozzle_mapping:
- for filament in filaments:
- filament["nozzle_id"] = nozzle_mapping.get(filament["slot_id"])
- except Exception as e:
- logger.warning("Failed to parse filament requirements from library file %s: %s", file_id, e)
- return {
- "file_id": file_id,
- "filename": lib_file.filename,
- "plate_id": plate_id,
- "filaments": filaments,
- }
- @router.post("/files/{file_id}/print")
- async def print_library_file(
- file_id: int,
- printer_id: int,
- body: FilePrintRequest | None = None,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.PRINTERS_CONTROL)),
- ):
- """Dispatch a library file for send/start on a printer.
- The actual send/start work is handled asynchronously by background
- dispatch so the UI can continue immediately.
- Only sliced files (.gcode or .gcode.3mf) can be printed.
- """
- from backend.app.models.printer import Printer
- from backend.app.services.background_dispatch import DispatchEnqueueRejected, background_dispatch
- from backend.app.services.printer_manager import printer_manager
- # Use defaults if no body provided
- if body is None:
- body = FilePrintRequest()
- # Get the library file
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- lib_file = result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- # Validate file is sliced
- if not is_sliced_file(lib_file.filename):
- raise HTTPException(
- status_code=400,
- detail="Not a sliced file. Only .gcode or .gcode.3mf files can be printed.",
- )
- # Get the full file path
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- # Get printer
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(status_code=404, detail="Printer not found")
- # Check printer is connected
- if not printer_manager.is_connected(printer_id):
- raise HTTPException(status_code=400, detail="Printer is not connected")
- plate_name = body.plate_name
- if not plate_name and body.plate_id is not None:
- plate_name = f"Plate {body.plate_id}"
- dispatch_source_name = lib_file.filename
- if plate_name:
- dispatch_source_name = f"{lib_file.filename} • {plate_name}"
- try:
- dispatch_result = await background_dispatch.dispatch_print_library_file(
- file_id=file_id,
- filename=dispatch_source_name,
- printer_id=printer_id,
- printer_name=printer.name,
- options=body.model_dump(exclude_none=True),
- requested_by_user_id=None,
- requested_by_username=None,
- )
- except DispatchEnqueueRejected as e:
- raise HTTPException(status_code=409, detail=str(e)) from e
- return {
- "status": "dispatched",
- "printer_id": printer_id,
- "archive_id": None,
- "filename": lib_file.filename,
- "dispatch_job_id": dispatch_result["dispatch_job_id"],
- "dispatch_position": dispatch_result["dispatch_position"],
- }
- # ============ File Detail Endpoints ============
- @router.get("/files/{file_id}", response_model=FileResponseSchema)
- async def get_file(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get a file by ID with full details."""
- result = await db.execute(
- select(LibraryFile).options(selectinload(LibraryFile.created_by)).where(LibraryFile.id == file_id)
- )
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- # Get folder name
- folder_name = None
- if file.folder_id:
- folder_result = await db.execute(select(LibraryFolder.name).where(LibraryFolder.id == file.folder_id))
- folder_name = folder_result.scalar()
- # Get project name
- project_name = None
- if file.project_id:
- project_result = await db.execute(select(Project.name).where(Project.id == file.project_id))
- project_name = project_result.scalar()
- # Get duplicates
- duplicates = []
- duplicate_count = 0
- if file.file_hash:
- dup_result = await db.execute(
- select(LibraryFile, LibraryFolder.name)
- .outerjoin(LibraryFolder, LibraryFile.folder_id == LibraryFolder.id)
- .where(LibraryFile.file_hash == file.file_hash, LibraryFile.id != file.id)
- )
- for dup_file, dup_folder_name in dup_result.all():
- duplicates.append(
- FileDuplicate(
- id=dup_file.id,
- filename=dup_file.filename,
- folder_id=dup_file.folder_id,
- folder_name=dup_folder_name,
- created_at=dup_file.created_at,
- )
- )
- duplicate_count = len(duplicates)
- # Extract key metadata fields
- print_name = None
- print_time = None
- filament_grams = None
- sliced_for_model = None
- if file.file_metadata:
- print_name = file.file_metadata.get("print_name")
- print_time = file.file_metadata.get("print_time_seconds")
- filament_grams = file.file_metadata.get("filament_used_grams")
- sliced_for_model = file.file_metadata.get("sliced_for_model")
- return FileResponseSchema(
- id=file.id,
- folder_id=file.folder_id,
- folder_name=folder_name,
- project_id=file.project_id,
- project_name=project_name,
- filename=file.filename,
- file_path=file.file_path,
- file_type=file.file_type,
- file_size=file.file_size,
- file_hash=file.file_hash,
- thumbnail_path=file.thumbnail_path,
- metadata=file.file_metadata,
- print_count=file.print_count,
- last_printed_at=file.last_printed_at,
- notes=file.notes,
- duplicates=duplicates if duplicates else None,
- duplicate_count=duplicate_count,
- created_by_id=file.created_by_id,
- created_by_username=file.created_by.username if file.created_by else None,
- created_at=file.created_at,
- updated_at=file.updated_at,
- print_name=print_name,
- print_time_seconds=print_time,
- filament_used_grams=filament_grams,
- sliced_for_model=sliced_for_model,
- )
- @router.put("/files/{file_id}", response_model=FileResponseSchema)
- async def update_file(
- file_id: int,
- data: FileUpdate,
- db: AsyncSession = Depends(get_db),
- auth_result: tuple[User | None, bool] = Depends(
- require_ownership_permission(
- Permission.LIBRARY_UPDATE_ALL,
- Permission.LIBRARY_UPDATE_OWN,
- )
- ),
- ):
- """Update a file's metadata."""
- user, can_modify_all = auth_result
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- # Ownership check
- if not can_modify_all:
- if file.created_by_id != user.id:
- raise HTTPException(status_code=403, detail="You can only update your own files")
- if data.filename is not None:
- # Validate filename doesn't contain path separators
- if "/" in data.filename or "\\" in data.filename:
- raise HTTPException(status_code=400, detail="Filename cannot contain path separators")
- file.filename = data.filename
- # Also update print_name in file_metadata so the display name matches
- if file.file_metadata and "print_name" in file.file_metadata:
- file.file_metadata = {**file.file_metadata, "print_name": data.filename}
- if data.folder_id is not None:
- if data.folder_id == 0:
- file.folder_id = None
- else:
- # Verify folder exists
- folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == data.folder_id))
- if not folder_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Folder not found")
- file.folder_id = data.folder_id
- if data.project_id is not None:
- if data.project_id == 0:
- file.project_id = None
- else:
- # Verify project exists
- project_result = await db.execute(select(Project).where(Project.id == data.project_id))
- if not project_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Project not found")
- file.project_id = data.project_id
- if data.notes is not None:
- file.notes = data.notes if data.notes else None
- await db.commit()
- await db.refresh(file)
- # Return full response (reuse get_file logic)
- return await get_file(file_id, db)
- @router.delete("/files/{file_id}")
- async def delete_file(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- auth_result: tuple[User | None, bool] = Depends(
- require_ownership_permission(
- Permission.LIBRARY_DELETE_ALL,
- Permission.LIBRARY_DELETE_OWN,
- )
- ),
- ):
- """Delete a file."""
- user, can_modify_all = auth_result
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- # Ownership check
- if not can_modify_all:
- if file.created_by_id != user.id:
- raise HTTPException(status_code=403, detail="You can only delete your own files")
- # Delete actual files
- try:
- abs_file_path = to_absolute_path(file.file_path)
- abs_thumb_path = to_absolute_path(file.thumbnail_path)
- if abs_file_path and abs_file_path.exists():
- abs_file_path.unlink()
- if abs_thumb_path and abs_thumb_path.exists():
- abs_thumb_path.unlink()
- except OSError as e:
- logger.warning("Failed to delete file from disk: %s", e)
- await db.delete(file)
- return {"status": "success", "message": "File deleted"}
- # ============ File Content Endpoints ============
- @router.get("/files/{file_id}/download")
- async def download_file(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Download a file."""
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- abs_path = to_absolute_path(file.file_path)
- if not abs_path or not abs_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- return FastAPIFileResponse(
- str(abs_path),
- filename=file.filename,
- media_type="application/octet-stream",
- )
- @router.post("/files/{file_id}/slicer-token")
- async def create_library_slicer_token(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Create a short-lived download token for opening files in slicer applications.
- Slicer protocol handlers (bambustudioopen://, orcaslicer://) cannot send
- auth headers, so they use this token in the URL path instead.
- """
- from backend.app.core.auth import create_slicer_download_token
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- token = create_slicer_download_token("library", file_id)
- return {"token": token}
- @router.get("/files/{file_id}/dl/{token}/{filename}")
- async def download_library_file_for_slicer(
- file_id: int,
- token: str,
- filename: str,
- db: AsyncSession = Depends(get_db),
- ):
- """Download a library file using a slicer download token.
- Token-authenticated (no auth headers needed). The token is short-lived
- and single-use, created by POST /files/{file_id}/slicer-token.
- Filename is at the end of the URL so slicers can detect the file format.
- """
- from backend.app.core.auth import verify_slicer_download_token
- if not verify_slicer_download_token(token, "library", file_id):
- raise HTTPException(status_code=403, detail="Invalid or expired download token")
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- abs_path = to_absolute_path(file.file_path)
- if not abs_path or not abs_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- return FastAPIFileResponse(
- str(abs_path),
- filename=file.filename,
- media_type="application/octet-stream",
- )
- @router.get("/files/{file_id}/thumbnail")
- async def get_thumbnail(file_id: int, db: AsyncSession = Depends(get_db)):
- """Get a file's thumbnail."""
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- abs_thumb_path = to_absolute_path(file.thumbnail_path)
- if not abs_thumb_path or not abs_thumb_path.exists():
- raise HTTPException(status_code=404, detail="Thumbnail not found")
- # Detect media type from extension
- thumb_ext = abs_thumb_path.suffix.lower()
- media_types = {
- ".png": "image/png",
- ".jpg": "image/jpeg",
- ".jpeg": "image/jpeg",
- ".gif": "image/gif",
- ".webp": "image/webp",
- }
- media_type = media_types.get(thumb_ext, "image/png")
- return FastAPIFileResponse(str(abs_thumb_path), media_type=media_type)
- @router.get("/files/{file_id}/gcode")
- async def get_gcode(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get gcode for a file (for preview)."""
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- abs_path = to_absolute_path(file.file_path)
- if not abs_path or not abs_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- if file.file_type == "gcode":
- return FastAPIFileResponse(str(abs_path), media_type="text/plain")
- elif file.file_type == "3mf":
- # Extract gcode from 3mf
- try:
- with zipfile.ZipFile(str(abs_path), "r") as zf:
- # Find gcode file
- gcode_files = [n for n in zf.namelist() if n.endswith(".gcode")]
- if not gcode_files:
- raise HTTPException(status_code=404, detail="No gcode found in 3MF file")
- gcode_content = zf.read(gcode_files[0])
- from fastapi.responses import Response
- return Response(content=gcode_content, media_type="text/plain")
- except zipfile.BadZipFile:
- raise HTTPException(status_code=400, detail="Invalid 3MF file")
- else:
- raise HTTPException(status_code=400, detail="Unsupported file type")
- # ============ Bulk Operations ============
- @router.post("/files/move")
- async def move_files(
- data: FileMoveRequest,
- db: AsyncSession = Depends(get_db),
- auth_result: tuple[User | None, bool] = Depends(
- require_ownership_permission(
- Permission.LIBRARY_UPDATE_ALL,
- Permission.LIBRARY_UPDATE_OWN,
- )
- ),
- ):
- """Move multiple files to a folder.
- Files not owned by the user are skipped (unless user has *_all permission).
- """
- user, can_modify_all = auth_result
- # Verify folder exists if specified
- if data.folder_id is not None:
- folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == data.folder_id))
- if not folder_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Folder not found")
- # Update files
- moved = 0
- skipped = 0
- for file_id in data.file_ids:
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if file:
- # Ownership check
- if not can_modify_all and file.created_by_id != user.id:
- skipped += 1
- continue
- file.folder_id = data.folder_id
- moved += 1
- return {"status": "success", "moved": moved, "skipped": skipped}
- @router.post("/bulk-delete", response_model=BulkDeleteResponse)
- async def bulk_delete(
- data: BulkDeleteRequest,
- db: AsyncSession = Depends(get_db),
- auth_result: tuple[User | None, bool] = Depends(
- require_ownership_permission(
- Permission.LIBRARY_DELETE_ALL,
- Permission.LIBRARY_DELETE_OWN,
- )
- ),
- ):
- """Delete multiple files and/or folders.
- Files not owned by the user are skipped (unless user has *_all permission).
- """
- user, can_modify_all = auth_result
- deleted_files = 0
- deleted_folders = 0
- skipped_files = 0
- # Delete files first
- for file_id in data.file_ids:
- result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if file:
- # Ownership check
- if not can_modify_all and file.created_by_id != user.id:
- skipped_files += 1
- continue
- try:
- abs_file_path = to_absolute_path(file.file_path)
- abs_thumb_path = to_absolute_path(file.thumbnail_path)
- if abs_file_path and abs_file_path.exists():
- abs_file_path.unlink()
- if abs_thumb_path and abs_thumb_path.exists():
- abs_thumb_path.unlink()
- except OSError as e:
- logger.warning("Failed to delete file from disk: %s", e)
- await db.delete(file)
- deleted_files += 1
- # Delete folders (cascade will handle contents)
- # Note: Folders don't have ownership tracking currently, require *_all permission
- for folder_id in data.folder_ids:
- if not can_modify_all:
- # Users without *_all permission cannot delete folders
- continue
- result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- folder = result.scalar_one_or_none()
- if folder:
- # Count files that will be deleted
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(LibraryFile.folder_id == folder_id)
- )
- deleted_files += file_count_result.scalar() or 0
- await db.delete(folder)
- deleted_folders += 1
- return BulkDeleteResponse(deleted_files=deleted_files, deleted_folders=deleted_folders)
- # ============ Stats Endpoint ============
- @router.get("/stats")
- async def get_library_stats(
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get library statistics."""
- # Total files
- total_files_result = await db.execute(select(func.count(LibraryFile.id)))
- total_files = total_files_result.scalar() or 0
- # Total folders
- total_folders_result = await db.execute(select(func.count(LibraryFolder.id)))
- total_folders = total_folders_result.scalar() or 0
- # Total size
- total_size_result = await db.execute(select(func.sum(LibraryFile.file_size)))
- total_size = total_size_result.scalar() or 0
- # Files by type
- type_result = await db.execute(
- select(LibraryFile.file_type, func.count(LibraryFile.id)).group_by(LibraryFile.file_type)
- )
- files_by_type = dict(type_result.all())
- # Total prints
- total_prints_result = await db.execute(select(func.sum(LibraryFile.print_count)))
- total_prints = total_prints_result.scalar() or 0
- # Disk space info
- library_dir = get_library_dir()
- try:
- disk_stat = shutil.disk_usage(library_dir)
- disk_free_bytes = disk_stat.free
- disk_total_bytes = disk_stat.total
- disk_used_bytes = disk_stat.used
- except OSError:
- disk_free_bytes = 0
- disk_total_bytes = 0
- disk_used_bytes = 0
- return {
- "total_files": total_files,
- "total_folders": total_folders,
- "total_size_bytes": total_size,
- "files_by_type": files_by_type,
- "total_prints": total_prints,
- "disk_free_bytes": disk_free_bytes,
- "disk_total_bytes": disk_total_bytes,
- "disk_used_bytes": disk_used_bytes,
- }
|