| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589 |
- """API routes for File Manager (Library) functionality."""
- import base64
- import binascii
- import contextlib
- import hashlib
- import logging
- import os
- import re
- import shutil
- import uuid
- import zipfile
- from datetime import datetime, timezone
- from pathlib import Path
- from fastapi import APIRouter, Depends, File, HTTPException, Query, Response, UploadFile
- from fastapi.responses import FileResponse as FastAPIFileResponse
- from sqlalchemy import func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import selectinload
- from backend.app.core.auth import (
- RequireCameraStreamTokenIfAuthEnabled,
- require_ownership_permission,
- require_permission_if_auth_enabled,
- )
- from backend.app.core.config import settings as app_settings
- from backend.app.core.database import get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.archive import PrintArchive
- from backend.app.models.library import LibraryFile, LibraryFolder
- from backend.app.models.print_queue import PrintQueueItem
- from backend.app.models.project import Project
- from backend.app.models.user import User
- from backend.app.schemas.library import (
- AddToQueueError,
- AddToQueueRequest,
- AddToQueueResponse,
- AddToQueueResult,
- BatchThumbnailRequest,
- BatchThumbnailResponse,
- BatchThumbnailResult,
- BulkDeleteRequest,
- BulkDeleteResponse,
- ExternalFolderCreate,
- FileDuplicate,
- FileListResponse,
- FileMoveRequest,
- FilePrintRequest,
- FileResponse as FileResponseSchema,
- FileUpdate,
- FileUploadResponse,
- FolderCreate,
- FolderResponse,
- FolderTreeItem,
- FolderUpdate,
- ZipExtractError,
- ZipExtractResponse,
- ZipExtractResult,
- )
- from backend.app.schemas.slicer import SliceRequest, SliceResponse
- from backend.app.services.archive import ThreeMFParser
- from backend.app.services.stl_thumbnail import generate_stl_thumbnail
- from backend.app.utils.threemf_tools import extract_nozzle_mapping_from_3mf
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/library", tags=["library"])
- def get_library_dir() -> Path:
- """Get the library storage directory."""
- base_dir = Path(app_settings.archive_dir)
- library_dir = base_dir / "library"
- library_dir.mkdir(parents=True, exist_ok=True)
- return library_dir
- def get_library_files_dir() -> Path:
- """Get the directory for library files."""
- files_dir = get_library_dir() / "files"
- files_dir.mkdir(parents=True, exist_ok=True)
- return files_dir
- def get_library_thumbnails_dir() -> Path:
- """Get the directory for library thumbnails."""
- thumbnails_dir = get_library_dir() / "thumbnails"
- thumbnails_dir.mkdir(parents=True, exist_ok=True)
- return thumbnails_dir
- def to_relative_path(absolute_path: Path | str) -> str:
- """Convert an absolute path to a path relative to base_dir for storage."""
- if not absolute_path:
- return ""
- abs_path = Path(absolute_path)
- base_dir = Path(app_settings.base_dir)
- try:
- return str(abs_path.relative_to(base_dir))
- except ValueError:
- # Path is not under base_dir, return as-is (shouldn't happen normally)
- return str(abs_path)
- def to_absolute_path(relative_path: str | None) -> Path | None:
- """Convert a relative path (from database) to an absolute path for file operations."""
- if not relative_path:
- return None
- path = Path(relative_path)
- # Handle already-absolute paths verbatim (backwards compatibility during migration).
- # Legacy DB rows may store absolute paths that predate the base_dir layout; the
- # traversal guard below only applies to relative paths coming from user input.
- if path.is_absolute():
- return path.resolve()
- base = Path(app_settings.base_dir).resolve()
- resolved = (base / relative_path).resolve()
- # Guard against path traversal — resolved path must stay inside base_dir.
- # Use is_relative_to() to avoid the /data/app vs /data/app_evil prefix confusion
- # that a plain startswith(str(base)) check would miss.
- if not resolved.is_relative_to(base):
- raise ValueError(f"Path escapes base directory: {relative_path!r}")
- return resolved
- def calculate_file_hash(file_path: Path) -> str:
- """Calculate SHA256 hash of a file."""
- sha256_hash = hashlib.sha256()
- with open(file_path, "rb") as f:
- for byte_block in iter(lambda: f.read(4096), b""):
- sha256_hash.update(byte_block)
- return sha256_hash.hexdigest()
- def _resolve_upload_destination(target_folder: LibraryFolder | None, filename: str) -> tuple[Path, bool]:
- """Resolve the on-disk destination for an uploaded file.
- Non-external target: returns ``(<library_files_dir>/<uuid><ext>, False)``.
- Writable external target: writes to ``<external_path>/<filename>``
- (preserves the real filename so the file is recognisable on the mount);
- returns ``(dest, True)``. Raises ``HTTPException`` for read-only external
- folders (403), missing/inaccessible/non-writable external paths (400), and
- filename collisions on the external mount (409). See #1112 — previously
- uploads to writable external folders were silently misrouted to the
- internal library dir.
- """
- if target_folder is not None and target_folder.is_external:
- if target_folder.external_readonly:
- raise HTTPException(status_code=403, detail="Cannot upload to a read-only external folder")
- if not target_folder.external_path:
- raise HTTPException(status_code=400, detail="External folder has no configured path")
- ext_dir = Path(target_folder.external_path)
- if not ext_dir.exists() or not ext_dir.is_dir():
- raise HTTPException(
- status_code=400,
- detail=f"External path is not accessible: {target_folder.external_path}",
- )
- if not os.access(ext_dir, os.W_OK):
- raise HTTPException(
- status_code=400,
- detail=f"External path is not writable: {target_folder.external_path}",
- )
- # Guard against path-traversal via a pathological filename — join then
- # verify the resolved destination is still inside the external dir.
- dest = (ext_dir / filename).resolve()
- try:
- dest.relative_to(ext_dir.resolve())
- except ValueError:
- raise HTTPException(status_code=400, detail="Invalid filename")
- if dest.exists():
- raise HTTPException(
- status_code=409,
- detail=f"A file named {filename!r} already exists in the external folder",
- )
- return dest, True
- ext = os.path.splitext(filename)[1].lower()
- return get_library_files_dir() / f"{uuid.uuid4().hex}{ext}", False
- def _stored_file_path(abs_path: Path, is_external: bool) -> str:
- """Produce the value to persist in ``LibraryFile.file_path``.
- External files store the absolute mount path directly (same as scan does),
- so ``to_absolute_path`` round-trips through its ``is_absolute()`` fast
- path. Managed files store a path relative to ``base_dir`` for portability.
- """
- return str(abs_path) if is_external else to_relative_path(abs_path)
- class _MoveSkip(Exception):
- """Signalled by ``_move_file_bytes`` to skip a file with a user-visible reason.
- Carries an optional `code` for machine-friendly grouping (the
- front-end can localise it) and a fallback English `reason` for logs.
- """
- def __init__(self, code: str, reason: str):
- super().__init__(reason)
- self.code = code
- self.reason = reason
- def _resolve_source_disk_path(file: LibraryFile) -> Path | None:
- """Return the absolute on-disk path for an existing LibraryFile, or None
- if it can't be located (legacy DB row, deleted file, etc.)."""
- if file.is_external:
- return Path(file.file_path) if file.file_path else None
- return to_absolute_path(file.file_path)
- def _move_file_bytes(file: LibraryFile, target_folder: LibraryFolder | None) -> str:
- """Physically relocate `file`'s bytes to match `target_folder`.
- Used by the move endpoint when source/target straddle the
- managed↔external boundary (#1112 follow-up — the prior implementation
- updated the DB row's ``folder_id`` but never moved the bytes, so a
- file moved to an external SMB folder showed up in Bambuddy's UI but
- not on the NAS).
- Returns the new ``file_path`` value to persist (relative for managed
- targets, absolute for external targets — matches the upload + scan
- paths). Raises ``_MoveSkip`` for any condition that would make the
- move unsafe (target unwritable, filename collision, source missing).
- The copy-then-unlink ordering means a partial copy followed by a
- failed unlink leaves both the source and the dest on disk — better
- than the symmetric "rename or move" which would lose the source if
- the target write didn't complete on a flaky mount. The DB row stays
- pointed at the source until the caller commits the new ``file_path``.
- """
- src = _resolve_source_disk_path(file)
- if not src or not src.exists():
- raise _MoveSkip("source_missing", "source file missing on disk")
- target_is_external = target_folder is not None and target_folder.is_external
- if target_is_external:
- if target_folder.external_readonly:
- # Already blocked at top level, but defence-in-depth.
- raise _MoveSkip("target_readonly", "target external folder is read-only")
- if not target_folder.external_path:
- raise _MoveSkip("target_misconfigured", "target external folder has no path")
- ext_dir = Path(target_folder.external_path)
- if not ext_dir.exists() or not ext_dir.is_dir():
- raise _MoveSkip("target_inaccessible", f"target path not accessible: {ext_dir}")
- if not os.access(ext_dir, os.W_OK):
- raise _MoveSkip("target_unwritable", f"target path not writable: {ext_dir}")
- dest = (ext_dir / file.filename).resolve()
- try:
- dest.relative_to(ext_dir.resolve())
- except ValueError:
- raise _MoveSkip("invalid_filename", f"unsafe filename: {file.filename!r}") from None
- if dest.exists():
- raise _MoveSkip("name_collision", f"a file named {file.filename!r} already exists in target")
- try:
- shutil.copy2(src, dest)
- except OSError as e:
- # Clean up partial dest so a retry can succeed.
- with contextlib.suppress(OSError):
- dest.unlink(missing_ok=True)
- raise _MoveSkip("copy_failed", f"copy failed: {e}") from e
- else:
- # → managed (root or non-external folder): generate a fresh UUID
- # filename in the internal store so we don't collide with another
- # file that happens to share `filename`.
- ext = src.suffix.lower()
- dest = get_library_files_dir() / f"{uuid.uuid4().hex}{ext}"
- try:
- shutil.copy2(src, dest)
- except OSError as e:
- with contextlib.suppress(OSError):
- dest.unlink(missing_ok=True)
- raise _MoveSkip("copy_failed", f"copy failed: {e}") from e
- # Copy succeeded — unlink the original. A failure here leaves an
- # orphan on disk but the DB row is consistent against the new dest.
- try:
- src.unlink(missing_ok=True)
- except OSError as e:
- logger.warning(
- "Move: copied %s → %s but couldn't remove source: %s",
- src,
- dest,
- e,
- )
- return _stored_file_path(dest, is_external=target_is_external)
- def _clean_3mf_metadata(obj):
- """Strip bytes and thumbnail-carrier keys so the payload is JSON-storable.
- Shared by ``upload_file`` and :func:`save_3mf_bytes_to_library` — the
- ``ThreeMFParser`` output embeds the thumbnail bytes under
- ``_thumbnail_data``/``_thumbnail_ext`` and may also include raw bytes in
- other fields, none of which can be JSON-encoded.
- """
- if isinstance(obj, dict):
- return {
- k: _clean_3mf_metadata(v)
- for k, v in obj.items()
- if not isinstance(v, bytes) and k not in ("_thumbnail_data", "_thumbnail_ext")
- }
- if isinstance(obj, list):
- return [_clean_3mf_metadata(i) for i in obj if not isinstance(i, bytes)]
- if isinstance(obj, bytes):
- return None
- return obj
- async def save_3mf_bytes_to_library(
- db: AsyncSession,
- *,
- file_bytes: bytes,
- filename: str,
- folder_id: int | None = None,
- source_type: str | None = None,
- source_url: str | None = None,
- owner_id: int | None = None,
- ) -> tuple[LibraryFile, bool]:
- """Save a 3MF blob into the library and return ``(library_file, was_existing)``.
- Used by routes that receive a 3MF in-process rather than as a multipart
- upload (currently: MakerWorld import; reusable for any future source that
- fetches bytes server-side). Deduplicates by ``source_url`` when provided —
- if a LibraryFile with the same source_url already exists, the existing
- row is returned and the bytes are NOT re-saved (MakerWorld signed URLs
- change each download, so hash-based dedupe alone would miss re-imports).
- Parses 3MF metadata + thumbnail the same way the multipart upload route
- does, via :class:`ThreeMFParser`. Paths are stored as relative so the
- library is portable across installs.
- """
- # Source-URL-based dedupe: return the existing row untouched.
- if source_url:
- existing = await db.execute(LibraryFile.active().where(LibraryFile.source_url == source_url).limit(1))
- existing_row = existing.scalar_one_or_none()
- if existing_row is not None:
- return existing_row, True
- # Persist bytes to disk under a UUID-scoped filename; keep the original
- # extension so downstream logic (ThreeMFParser, thumbnail viewer) works.
- ext = os.path.splitext(filename)[1].lower() or ".3mf"
- unique_filename = f"{uuid.uuid4().hex}{ext}"
- file_path = get_library_files_dir() / unique_filename
- with open(file_path, "wb") as fh:
- fh.write(file_bytes)
- file_hash = calculate_file_hash(file_path)
- # Extract metadata + thumbnail from the 3MF.
- metadata: dict | None = None
- thumbnail_path: str | None = None
- if ext == ".3mf":
- try:
- parser = ThreeMFParser(str(file_path))
- raw_metadata = parser.parse()
- thumb_data = raw_metadata.get("_thumbnail_data")
- thumb_ext = raw_metadata.get("_thumbnail_ext", ".png")
- if thumb_data:
- thumbs_dir = get_library_thumbnails_dir()
- thumb_filename = f"{uuid.uuid4().hex}{thumb_ext}"
- thumb_path = thumbs_dir / thumb_filename
- with open(thumb_path, "wb") as fh:
- fh.write(thumb_data)
- thumbnail_path = str(thumb_path)
- metadata = _clean_3mf_metadata(raw_metadata) or None
- except Exception as exc:
- # Matches the multipart upload route's behaviour — a bad 3MF should
- # still land in the library so the user can see / delete it rather
- # than failing the whole request.
- logger.warning("Failed to parse 3MF %s: %s", filename, exc)
- library_file = LibraryFile(
- folder_id=folder_id,
- filename=filename,
- file_path=to_relative_path(file_path),
- file_type=ext[1:] if ext else "unknown",
- file_size=len(file_bytes),
- file_hash=file_hash,
- thumbnail_path=to_relative_path(thumbnail_path) if thumbnail_path else None,
- file_metadata=metadata,
- source_type=source_type,
- source_url=source_url,
- created_by_id=owner_id,
- )
- db.add(library_file)
- await db.commit()
- await db.refresh(library_file)
- return library_file, False
- def extract_gcode_thumbnail(file_path: Path) -> bytes | None:
- """Extract embedded thumbnail from gcode file.
- Supports PrusaSlicer/BambuStudio format:
- ; thumbnail begin WxH SIZE
- ; base64data...
- ; thumbnail end
- """
- try:
- thumbnail_data = None
- in_thumbnail = False
- thumbnail_lines = []
- best_size = 0
- with open(file_path, errors="ignore") as f:
- # Only read first 50KB for performance (thumbnails are at the start)
- content = f.read(50000)
- for line in content.split("\n"):
- line = line.strip()
- # Check for thumbnail start
- if line.startswith("; thumbnail begin"):
- in_thumbnail = True
- thumbnail_lines = []
- # Parse dimensions: "; thumbnail begin 300x300 12345"
- match = re.search(r"(\d+)x(\d+)", line)
- if match:
- width = int(match.group(1))
- # Prefer larger thumbnails (up to 300px)
- if width > best_size and width <= 300:
- best_size = width
- continue
- # Check for thumbnail end
- if line.startswith("; thumbnail end"):
- if in_thumbnail and thumbnail_lines:
- try:
- # Decode the base64 data
- b64_data = "".join(thumbnail_lines)
- decoded = base64.b64decode(b64_data)
- # Only keep if this is the best size or first valid thumbnail
- if thumbnail_data is None or best_size > 0:
- thumbnail_data = decoded
- except (binascii.Error, ValueError):
- pass # Skip thumbnail with invalid base64 data
- in_thumbnail = False
- thumbnail_lines = []
- continue
- # Collect thumbnail data
- if in_thumbnail and line.startswith(";"):
- # Remove the leading "; " or ";"
- data_line = line[1:].strip()
- if data_line:
- thumbnail_lines.append(data_line)
- return thumbnail_data
- except Exception as e:
- logger.warning("Failed to extract gcode thumbnail: %s", e)
- return None
- def create_image_thumbnail(file_path: Path, thumbnails_dir: Path, max_size: int = 256) -> str | None:
- """Create a thumbnail from an image file.
- For small images, copies directly. For larger images, resizes.
- Returns the thumbnail path or None on failure.
- """
- try:
- from PIL import Image
- thumb_filename = f"{uuid.uuid4().hex}.png"
- thumb_path = thumbnails_dir / thumb_filename
- with Image.open(file_path) as img:
- # Convert to RGB if necessary (for PNG with transparency, etc.)
- if img.mode in ("RGBA", "LA", "P"):
- # Create white background for transparency
- background = Image.new("RGB", img.size, (255, 255, 255))
- if img.mode == "P":
- img = img.convert("RGBA")
- background.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
- img = background
- elif img.mode != "RGB":
- img = img.convert("RGB")
- # Resize if larger than max_size
- if img.width > max_size or img.height > max_size:
- img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
- img.save(thumb_path, "PNG", optimize=True)
- return str(thumb_path)
- except ImportError:
- # PIL not installed, just copy the file if it's small enough
- logger.warning("PIL not installed, copying image as thumbnail")
- try:
- file_size = file_path.stat().st_size
- if file_size < 500000: # Less than 500KB
- thumb_filename = f"{uuid.uuid4().hex}{file_path.suffix}"
- thumb_path = thumbnails_dir / thumb_filename
- shutil.copy2(file_path, thumb_path)
- return str(thumb_path)
- except OSError:
- pass # File inaccessible; fall through to return None
- return None
- except Exception as e:
- logger.warning("Failed to create image thumbnail: %s", e)
- return None
- # Supported image extensions for thumbnails
- IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".tiff", ".tif"}
- # ============ Folder Endpoints ============
- @router.get("/folders", response_model=list[FolderTreeItem])
- @router.get("/folders/", response_model=list[FolderTreeItem])
- async def list_folders(
- response: Response,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get all folders as a tree structure."""
- # Prevent browser caching of folder list
- response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
- # Get all folders with project and archive joins
- result = await db.execute(
- select(LibraryFolder, Project.name, PrintArchive.print_name)
- .outerjoin(Project, LibraryFolder.project_id == Project.id)
- .outerjoin(PrintArchive, LibraryFolder.archive_id == PrintArchive.id)
- .order_by(LibraryFolder.name)
- )
- rows = result.all()
- # Get file counts per folder
- file_counts_result = await db.execute(
- select(LibraryFile.folder_id, func.count(LibraryFile.id))
- .where(LibraryFile.folder_id.isnot(None), LibraryFile.deleted_at.is_(None))
- .group_by(LibraryFile.folder_id)
- )
- file_counts = dict(file_counts_result.all())
- # Build tree structure
- folder_map = {}
- root_folders = []
- for folder, project_name, archive_name in rows:
- folder_item = FolderTreeItem(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=archive_name,
- is_external=folder.is_external,
- external_path=folder.external_path,
- external_readonly=folder.external_readonly,
- file_count=file_counts.get(folder.id, 0),
- children=[],
- )
- folder_map[folder.id] = folder_item
- # Link children to parents
- for folder, _, _ in rows:
- folder_item = folder_map[folder.id]
- if folder.parent_id is None:
- root_folders.append(folder_item)
- elif folder.parent_id in folder_map:
- folder_map[folder.parent_id].children.append(folder_item)
- return root_folders
- @router.get("/folders/by-project/{project_id}", response_model=list[FolderResponse])
- async def get_folders_by_project(
- project_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get all folders linked to a specific project."""
- result = await db.execute(
- select(LibraryFolder, Project.name)
- .outerjoin(Project, LibraryFolder.project_id == Project.id)
- .where(LibraryFolder.project_id == project_id)
- .order_by(LibraryFolder.name)
- )
- rows = result.all()
- folders = []
- for folder, project_name in rows:
- # Get file count
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(
- LibraryFile.folder_id == folder.id,
- LibraryFile.deleted_at.is_(None),
- )
- )
- file_count = file_count_result.scalar() or 0
- folders.append(
- FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=None,
- is_external=folder.is_external,
- external_path=folder.external_path,
- external_readonly=folder.external_readonly,
- external_show_hidden=folder.external_show_hidden,
- file_count=file_count,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- )
- return folders
- @router.get("/folders/by-archive/{archive_id}", response_model=list[FolderResponse])
- async def get_folders_by_archive(
- archive_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get all folders linked to a specific archive."""
- result = await db.execute(
- select(LibraryFolder, PrintArchive.print_name)
- .outerjoin(PrintArchive, LibraryFolder.archive_id == PrintArchive.id)
- .where(LibraryFolder.archive_id == archive_id)
- .order_by(LibraryFolder.name)
- )
- rows = result.all()
- folders = []
- for folder, archive_name in rows:
- # Get file count
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(
- LibraryFile.folder_id == folder.id,
- LibraryFile.deleted_at.is_(None),
- )
- )
- file_count = file_count_result.scalar() or 0
- folders.append(
- FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=None,
- archive_name=archive_name,
- is_external=folder.is_external,
- external_path=folder.external_path,
- external_readonly=folder.external_readonly,
- external_show_hidden=folder.external_show_hidden,
- file_count=file_count,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- )
- return folders
- @router.post("/folders", response_model=FolderResponse)
- @router.post("/folders/", response_model=FolderResponse)
- async def create_folder(
- data: FolderCreate,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Create a new folder."""
- # Verify parent exists if specified
- if data.parent_id is not None:
- parent_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == data.parent_id))
- if not parent_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Parent folder not found")
- # Verify project exists if specified
- project_name = None
- if data.project_id is not None:
- project_result = await db.execute(select(Project).where(Project.id == data.project_id))
- project = project_result.scalar_one_or_none()
- if not project:
- raise HTTPException(status_code=404, detail="Project not found")
- project_name = project.name
- # Verify archive exists if specified
- archive_name = None
- if data.archive_id is not None:
- archive_result = await db.execute(select(PrintArchive).where(PrintArchive.id == data.archive_id))
- archive = archive_result.scalar_one_or_none()
- if not archive:
- raise HTTPException(status_code=404, detail="Archive not found")
- archive_name = archive.print_name
- folder = LibraryFolder(
- name=data.name,
- parent_id=data.parent_id,
- project_id=data.project_id,
- archive_id=data.archive_id,
- )
- db.add(folder)
- await db.commit()
- await db.refresh(folder)
- return FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=archive_name,
- is_external=folder.is_external,
- external_path=folder.external_path,
- external_readonly=folder.external_readonly,
- external_show_hidden=folder.external_show_hidden,
- file_count=0,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- @router.get("/folders/{folder_id}", response_model=FolderResponse)
- async def get_folder(
- folder_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get a folder by ID."""
- result = await db.execute(
- select(LibraryFolder, Project.name, PrintArchive.print_name)
- .outerjoin(Project, LibraryFolder.project_id == Project.id)
- .outerjoin(PrintArchive, LibraryFolder.archive_id == PrintArchive.id)
- .where(LibraryFolder.id == folder_id)
- )
- row = result.one_or_none()
- if not row:
- raise HTTPException(status_code=404, detail="Folder not found")
- folder, project_name, archive_name = row
- # Get file count
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(
- LibraryFile.folder_id == folder_id,
- LibraryFile.deleted_at.is_(None),
- )
- )
- file_count = file_count_result.scalar() or 0
- return FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=archive_name,
- is_external=folder.is_external,
- external_path=folder.external_path,
- external_readonly=folder.external_readonly,
- external_show_hidden=folder.external_show_hidden,
- file_count=file_count,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- @router.put("/folders/{folder_id}", response_model=FolderResponse)
- async def update_folder(
- folder_id: int,
- data: FolderUpdate,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPDATE_ALL)),
- ):
- """Update a folder.
- Note: Folders require library:update_all permission since they don't have
- ownership tracking.
- """
- result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- folder = result.scalar_one_or_none()
- if not folder:
- raise HTTPException(status_code=404, detail="Folder not found")
- if data.name is not None:
- folder.name = data.name
- if data.parent_id is not None:
- # Prevent circular reference
- if data.parent_id == folder_id:
- raise HTTPException(status_code=400, detail="Folder cannot be its own parent")
- # Check for circular reference in ancestors
- if data.parent_id != 0: # 0 means move to root
- current_id = data.parent_id
- while current_id is not None:
- if current_id == folder_id:
- raise HTTPException(status_code=400, detail="Cannot move folder into its own subtree")
- parent_result = await db.execute(select(LibraryFolder.parent_id).where(LibraryFolder.id == current_id))
- current_id = parent_result.scalar()
- folder.parent_id = data.parent_id
- else:
- folder.parent_id = None
- # Update project_id (0 to unlink)
- if data.project_id is not None:
- if data.project_id == 0:
- folder.project_id = None
- else:
- # Verify project exists
- project_result = await db.execute(select(Project).where(Project.id == data.project_id))
- if not project_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Project not found")
- folder.project_id = data.project_id
- # Update archive_id (0 to unlink)
- if data.archive_id is not None:
- if data.archive_id == 0:
- folder.archive_id = None
- else:
- # Verify archive exists
- archive_result = await db.execute(select(PrintArchive).where(PrintArchive.id == data.archive_id))
- if not archive_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Archive not found")
- folder.archive_id = data.archive_id
- await db.commit()
- await db.refresh(folder)
- # Get file count and names
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(
- LibraryFile.folder_id == folder_id,
- LibraryFile.deleted_at.is_(None),
- )
- )
- file_count = file_count_result.scalar() or 0
- # Get project and archive names
- project_name = None
- archive_name = None
- if folder.project_id:
- project_result = await db.execute(select(Project.name).where(Project.id == folder.project_id))
- project_name = project_result.scalar()
- if folder.archive_id:
- archive_result = await db.execute(select(PrintArchive.print_name).where(PrintArchive.id == folder.archive_id))
- archive_name = archive_result.scalar()
- return FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=folder.project_id,
- archive_id=folder.archive_id,
- project_name=project_name,
- archive_name=archive_name,
- is_external=folder.is_external,
- external_path=folder.external_path,
- external_readonly=folder.external_readonly,
- external_show_hidden=folder.external_show_hidden,
- file_count=file_count,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- @router.delete("/folders/{folder_id}")
- async def delete_folder(
- folder_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_DELETE_ALL)),
- ):
- """Delete a folder and all its contents (cascade).
- Note: Folders require library:delete_all permission since they don't have
- ownership tracking.
- """
- result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- folder = result.scalar_one_or_none()
- if not folder:
- raise HTTPException(status_code=404, detail="Folder not found")
- # External folders: only remove DB records, never delete files from external path
- is_ext = folder.is_external
- # Get all files in this folder and subfolders to delete from disk
- async def get_all_file_ids(fid: int) -> list[int]:
- """Recursively get all file IDs in a folder tree."""
- file_ids = []
- # Get files in this folder
- files_result = await db.execute(
- select(LibraryFile.id, LibraryFile.file_path, LibraryFile.thumbnail_path, LibraryFile.is_external).where(
- LibraryFile.folder_id == fid
- )
- )
- for fid_val, file_path, thumb_path, file_is_ext in files_result.all():
- file_ids.append(fid_val)
- # Only delete non-external files from disk
- if not is_ext and not file_is_ext:
- try:
- if file_path and os.path.exists(file_path):
- os.remove(file_path)
- if thumb_path and os.path.exists(thumb_path):
- os.remove(thumb_path)
- except OSError as e:
- logger.warning("Failed to delete file: %s", e)
- # Get child folders and recurse
- children_result = await db.execute(select(LibraryFolder.id).where(LibraryFolder.parent_id == fid))
- for (child_id,) in children_result.all():
- file_ids.extend(await get_all_file_ids(child_id))
- return file_ids
- await get_all_file_ids(folder_id)
- # Delete folder (cascade will handle files and subfolders)
- await db.delete(folder)
- await db.commit()
- return {"status": "success", "message": "Folder deleted"}
- # ============ External Folder Endpoints ============
- # Blocked system directories that cannot be mounted
- _BLOCKED_PREFIXES = (
- "/proc",
- "/sys",
- "/dev",
- "/run",
- "/boot",
- "/sbin",
- "/bin",
- "/usr/sbin",
- "/usr/bin",
- "/lib",
- "/etc",
- )
- # Supported file extensions for external folder scanning
- _SCANNABLE_EXTENSIONS = {
- ".3mf",
- ".gcode",
- ".gcode.3mf",
- ".stl",
- ".obj",
- ".step",
- ".stp",
- ".png",
- ".jpg",
- ".jpeg",
- ".gif",
- ".webp",
- ".svg",
- }
- def _validate_external_path(path_str: str) -> Path:
- """Validate an external path is safe to mount."""
- path = Path(path_str).resolve()
- if not path.is_absolute():
- raise HTTPException(status_code=400, detail="Path must be absolute")
- for prefix in _BLOCKED_PREFIXES:
- if str(path).startswith(prefix):
- raise HTTPException(status_code=400, detail=f"Cannot mount system directory: {prefix}")
- if not path.exists():
- raise HTTPException(status_code=400, detail=f"Path does not exist: {path}")
- if not path.is_dir():
- raise HTTPException(status_code=400, detail=f"Path is not a directory: {path}")
- # Check readability
- if not os.access(path, os.R_OK):
- raise HTTPException(status_code=400, detail=f"Path is not readable: {path}")
- return path
- @router.post("/folders/external", response_model=FolderResponse)
- async def create_external_folder(
- data: ExternalFolderCreate,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Create an external folder that points to a host directory."""
- resolved = _validate_external_path(data.external_path)
- # Check no other external folder already points to this path
- existing = await db.execute(
- select(LibraryFolder).where(
- LibraryFolder.is_external.is_(True),
- LibraryFolder.external_path == str(resolved),
- )
- )
- if existing.scalar_one_or_none():
- raise HTTPException(status_code=409, detail="An external folder already exists for this path")
- # Verify parent exists if specified
- if data.parent_id is not None:
- parent_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == data.parent_id))
- if not parent_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Parent folder not found")
- folder = LibraryFolder(
- name=data.name,
- parent_id=data.parent_id,
- is_external=True,
- external_path=str(resolved),
- external_readonly=data.readonly,
- external_show_hidden=data.show_hidden,
- )
- db.add(folder)
- await db.commit()
- await db.refresh(folder)
- return FolderResponse(
- id=folder.id,
- name=folder.name,
- parent_id=folder.parent_id,
- project_id=None,
- archive_id=None,
- is_external=True,
- external_path=folder.external_path,
- external_readonly=folder.external_readonly,
- external_show_hidden=folder.external_show_hidden,
- file_count=0,
- created_at=folder.created_at,
- updated_at=folder.updated_at,
- )
- @router.post("/folders/{folder_id}/scan")
- async def scan_external_folder(
- folder_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Scan an external folder and sync files to the database.
- Discovers new files, removes DB entries for deleted files.
- Does not copy files — stores the external path directly.
- """
- result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- folder = result.scalar_one_or_none()
- if not folder:
- raise HTTPException(status_code=404, detail="Folder not found")
- if not folder.is_external or not folder.external_path:
- raise HTTPException(status_code=400, detail="Not an external folder")
- ext_path = Path(folder.external_path)
- if not ext_path.exists() or not ext_path.is_dir():
- raise HTTPException(status_code=400, detail=f"External path is not accessible: {folder.external_path}")
- # Collect all existing child external subfolder IDs (single query)
- all_folder_ids = [folder_id]
- child_result = await db.execute(
- select(LibraryFolder).where(
- LibraryFolder.is_external.is_(True),
- LibraryFolder.parent_id.isnot(None),
- )
- )
- all_child_folders = child_result.scalars().all()
- # Walk the parent chain to find all descendants of folder_id
- parent_to_children: dict[int, list] = {}
- for cf in all_child_folders:
- parent_to_children.setdefault(cf.parent_id, []).append(cf)
- queue = [folder_id]
- while queue:
- pid = queue.pop()
- for child in parent_to_children.get(pid, []):
- all_folder_ids.append(child.id)
- queue.append(child.id)
- # Get existing DB files across root and all subfolders
- existing_result = await db.execute(
- LibraryFile.active().where(
- LibraryFile.folder_id.in_(all_folder_ids),
- LibraryFile.is_external.is_(True),
- )
- )
- existing_files = {f.file_path: f for f in existing_result.scalars().all()}
- # Build folder cache: relative path -> folder_id (for resolving subfolders)
- # Pre-populate with existing child folders keyed by their external_path
- folder_cache: dict[str, int] = {"": folder_id}
- for fid in all_folder_ids:
- if fid == folder_id:
- continue
- # Find the child folder object
- for cf in all_child_folders:
- if cf.id == fid and cf.external_path:
- try:
- rel = str(Path(cf.external_path).relative_to(ext_path))
- if rel != ".":
- folder_cache[rel] = cf.id
- except ValueError:
- pass
- # Scan the directory
- added = 0
- removed = 0
- found_paths: set[str] = set()
- seen_rel_dirs: set[str] = set()
- for dirpath, dirnames, filenames in os.walk(ext_path):
- # Filter hidden directories unless configured
- if not folder.external_show_hidden:
- dirnames[:] = [d for d in dirnames if not d.startswith(".")]
- rel_dir = str(Path(dirpath).relative_to(ext_path))
- if rel_dir == ".":
- rel_dir = ""
- seen_rel_dirs.add(rel_dir)
- # Resolve or create subfolder chain for this directory
- if rel_dir and rel_dir not in folder_cache:
- parts = Path(rel_dir).parts
- current_path = ""
- current_parent = folder_id
- for part in parts:
- current_path = f"{current_path}/{part}".lstrip("/")
- if current_path in folder_cache:
- current_parent = folder_cache[current_path]
- else:
- existing_sub = await db.execute(
- select(LibraryFolder).where(
- LibraryFolder.name == part,
- LibraryFolder.parent_id == current_parent,
- LibraryFolder.is_external.is_(True),
- )
- )
- existing_folder = existing_sub.scalar_one_or_none()
- if existing_folder:
- current_parent = existing_folder.id
- else:
- new_folder = LibraryFolder(
- name=part,
- parent_id=current_parent,
- is_external=True,
- external_path=str(ext_path / current_path),
- external_readonly=folder.external_readonly,
- external_show_hidden=folder.external_show_hidden,
- )
- db.add(new_folder)
- await db.flush()
- current_parent = new_folder.id
- folder_cache[current_path] = current_parent
- target_folder_id = folder_cache.get(rel_dir, folder_id)
- for filename in filenames:
- # Skip hidden files unless configured
- if not folder.external_show_hidden and filename.startswith("."):
- continue
- filepath = Path(dirpath) / filename
- ext = filepath.suffix.lower()
- # Check for compound extensions like .gcode.3mf
- if ext not in _SCANNABLE_EXTENSIONS:
- # Check compound
- compound = "".join(filepath.suffixes[-2:]).lower() if len(filepath.suffixes) >= 2 else ""
- if compound not in _SCANNABLE_EXTENSIONS:
- continue
- # Resolve symlinks and ensure still under external_path
- try:
- real_path = filepath.resolve()
- real_path.relative_to(ext_path.resolve())
- except (ValueError, OSError):
- continue # Symlink escapes the external dir
- file_path_str = str(filepath)
- found_paths.add(file_path_str)
- if file_path_str in existing_files:
- continue # Already tracked
- # Get file info
- try:
- stat = filepath.stat()
- except OSError:
- continue
- file_type = ext[1:] if ext else "unknown"
- # For compound extensions, use the meaningful part
- if file_type in ("3mf",) and len(filepath.suffixes) >= 2:
- inner = filepath.suffixes[-2].lower()
- if inner == ".gcode":
- file_type = "gcode.3mf"
- # Extract thumbnail for 3mf files
- thumbnail_path = None
- file_metadata = None
- if file_type == "3mf":
- try:
- parser = ThreeMFParser(str(filepath))
- raw_metadata = parser.parse()
- if raw_metadata:
- # Extract thumbnail before cleaning metadata
- thumb_data = raw_metadata.get("_thumbnail_data")
- thumbnail_ext = raw_metadata.get("_thumbnail_ext", ".png")
- if thumb_data:
- thumb_dir = get_library_thumbnails_dir()
- thumb_filename = f"{uuid.uuid4().hex}{thumbnail_ext}"
- thumb_full = thumb_dir / thumb_filename
- thumb_full.write_bytes(thumb_data)
- thumbnail_path = to_relative_path(thumb_full)
- # Clean metadata - remove non-JSON-serializable data (bytes, etc.)
- def clean_metadata(obj):
- if isinstance(obj, dict):
- return {
- k: clean_metadata(v)
- for k, v in obj.items()
- if not isinstance(v, bytes) and k not in ("_thumbnail_data", "_thumbnail_ext")
- }
- elif isinstance(obj, list):
- return [clean_metadata(i) for i in obj if not isinstance(i, bytes)]
- elif isinstance(obj, bytes):
- return None
- return obj
- file_metadata = clean_metadata(raw_metadata)
- except Exception as e:
- logger.debug("Failed to extract metadata from external 3mf %s: %s", filepath, e)
- # Generate thumbnail for STL files
- if file_type == "stl" and thumbnail_path is None:
- try:
- thumb_dir = get_library_thumbnails_dir()
- thumb_result = generate_stl_thumbnail(str(filepath), str(thumb_dir))
- if thumb_result:
- thumbnail_path = to_relative_path(Path(thumb_result))
- except Exception as e:
- logger.debug("Failed to generate STL thumbnail for external %s: %s", filepath, e)
- # Extract gcode thumbnail
- if file_type == "gcode" and thumbnail_path is None:
- thumb_data = extract_gcode_thumbnail(filepath)
- if thumb_data:
- thumb_dir = get_library_thumbnails_dir()
- thumb_filename = f"{uuid.uuid4().hex}.png"
- thumb_full = thumb_dir / thumb_filename
- thumb_full.write_bytes(thumb_data)
- thumbnail_path = to_relative_path(thumb_full)
- # Create thumbnail for image files
- if ext.lower() in IMAGE_EXTENSIONS and thumbnail_path is None:
- thumbnail_path_str = create_image_thumbnail(filepath, get_library_thumbnails_dir())
- if thumbnail_path_str:
- thumbnail_path = to_relative_path(Path(thumbnail_path_str))
- db_file = LibraryFile(
- folder_id=target_folder_id,
- is_external=True,
- filename=filename,
- file_path=file_path_str,
- file_type=file_type,
- file_size=stat.st_size,
- file_hash=None, # Skip hashing external files for performance
- thumbnail_path=thumbnail_path,
- file_metadata=file_metadata,
- )
- db.add(db_file)
- added += 1
- # Remove DB entries for files that no longer exist on disk
- for path_str, db_file in existing_files.items():
- if path_str not in found_paths:
- # Clean up thumbnail if we generated one
- if db_file.thumbnail_path:
- try:
- abs_thumb = to_absolute_path(db_file.thumbnail_path)
- if abs_thumb and abs_thumb.exists():
- abs_thumb.unlink()
- except OSError:
- pass
- await db.delete(db_file)
- removed += 1
- # Remove empty subfolders whose directories no longer exist on disk
- # Process deepest-first by sorting on path depth (descending)
- subfolder_entries = [(rel, fid) for rel, fid in folder_cache.items() if rel and fid != folder_id]
- subfolder_entries.sort(key=lambda x: x[0].count("/"), reverse=True)
- for rel_path, sub_fid in subfolder_entries:
- if rel_path in seen_rel_dirs:
- continue # Directory still exists on disk
- # Check if subfolder has any remaining files
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(
- LibraryFile.folder_id == sub_fid,
- LibraryFile.deleted_at.is_(None),
- )
- )
- if (file_count_result.scalar() or 0) == 0:
- # Check if it has any remaining child folders
- child_count_result = await db.execute(
- select(func.count(LibraryFolder.id)).where(LibraryFolder.parent_id == sub_fid)
- )
- if (child_count_result.scalar() or 0) == 0:
- sub_folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == sub_fid))
- sub_folder_obj = sub_folder_result.scalar_one_or_none()
- if sub_folder_obj:
- await db.delete(sub_folder_obj)
- await db.commit()
- return {"status": "success", "added": added, "removed": removed}
- # ============ File Endpoints ============
- @router.get("/files", response_model=list[FileListResponse])
- @router.get("/files/", response_model=list[FileListResponse])
- async def list_files(
- response: Response,
- folder_id: int | None = None,
- project_id: int | None = None,
- include_root: bool = True,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """List files, optionally filtered by folder or project.
- Args:
- folder_id: Filter by folder ID. If None and include_root=True, returns root files.
- project_id: Return all files across folders linked to this project (bulk fetch, avoids N+1).
- include_root: If True and folder_id is None, returns files at root level.
- If False and folder_id is None, returns all files.
- """
- query = LibraryFile.active().options(selectinload(LibraryFile.created_by))
- if folder_id is not None:
- query = query.where(LibraryFile.folder_id == folder_id)
- elif project_id is not None:
- # Single join instead of one query per folder (avoids N+1 pattern)
- query = query.join(LibraryFolder, LibraryFile.folder_id == LibraryFolder.id)
- query = query.where(LibraryFolder.project_id == project_id)
- elif include_root:
- query = query.where(LibraryFile.folder_id.is_(None))
- query = query.order_by(LibraryFile.filename)
- result = await db.execute(query)
- files = result.scalars().all()
- # Get duplicate counts
- hash_counts = {}
- if files:
- hashes = [f.file_hash for f in files if f.file_hash]
- if hashes:
- dup_result = await db.execute(
- select(LibraryFile.file_hash, func.count(LibraryFile.id))
- .where(LibraryFile.file_hash.in_(hashes), LibraryFile.deleted_at.is_(None))
- .group_by(LibraryFile.file_hash)
- )
- hash_counts = {h: c - 1 for h, c in dup_result.all()} # -1 to exclude self
- # Prevent browser caching of file list
- response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
- file_list = []
- for f in files:
- # Extract key metadata for display
- print_name = None
- print_time = None
- filament_grams = None
- sliced_for_model = None
- if f.file_metadata:
- print_name = f.file_metadata.get("print_name")
- print_time = f.file_metadata.get("print_time_seconds")
- filament_grams = f.file_metadata.get("filament_used_grams")
- sliced_for_model = f.file_metadata.get("sliced_for_model")
- file_list.append(
- FileListResponse(
- id=f.id,
- folder_id=f.folder_id,
- is_external=f.is_external,
- filename=f.filename,
- file_type=f.file_type,
- file_size=f.file_size,
- thumbnail_path=f.thumbnail_path,
- print_count=f.print_count,
- duplicate_count=hash_counts.get(f.file_hash, 0) if f.file_hash else 0,
- created_by_id=f.created_by_id,
- created_by_username=f.created_by.username if f.created_by else None,
- created_at=f.created_at,
- print_name=print_name,
- print_time_seconds=print_time,
- filament_used_grams=filament_grams,
- sliced_for_model=sliced_for_model,
- )
- )
- return file_list
- @router.post("/files", response_model=FileUploadResponse)
- @router.post("/files/", response_model=FileUploadResponse)
- async def upload_file(
- file: UploadFile = File(...),
- folder_id: int | None = None,
- generate_stl_thumbnails: bool = Query(default=True),
- db: AsyncSession = Depends(get_db),
- current_user: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Upload a file to the library."""
- try:
- if not file.filename:
- raise HTTPException(status_code=400, detail="Filename is required")
- filename = file.filename
- ext = os.path.splitext(filename)[1].lower()
- # Handle files without extension
- file_type = ext[1:] if ext else "unknown"
- # Verify folder exists if specified
- target_folder = None
- if folder_id is not None:
- folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- target_folder = folder_result.scalar_one_or_none()
- if not target_folder:
- raise HTTPException(status_code=404, detail="Folder not found")
- # Writable external folders write through to the mount so the file is
- # visible outside Bambuddy (#1112); everything else lands under the
- # internal library dir with a UUID-scoped filename.
- file_path, is_external_upload = _resolve_upload_destination(target_folder, filename)
- # Save file
- content = await file.read()
- with open(file_path, "wb") as f:
- f.write(content)
- # Calculate hash
- file_hash = calculate_file_hash(file_path)
- # Check for duplicates
- dup_result = await db.execute(
- select(LibraryFile.id).where(LibraryFile.file_hash == file_hash, LibraryFile.deleted_at.is_(None)).limit(1)
- )
- duplicate_of = dup_result.scalar()
- # Extract metadata and thumbnail
- metadata = {}
- thumbnail_path = None
- thumbnails_dir = get_library_thumbnails_dir()
- if ext == ".3mf":
- try:
- parser = ThreeMFParser(str(file_path))
- raw_metadata = parser.parse()
- # Extract thumbnail before cleaning metadata
- thumbnail_data = raw_metadata.get("_thumbnail_data")
- thumbnail_ext = raw_metadata.get("_thumbnail_ext", ".png")
- # Save thumbnail if extracted
- if thumbnail_data:
- thumb_filename = f"{uuid.uuid4().hex}{thumbnail_ext}"
- thumb_path = thumbnails_dir / thumb_filename
- with open(thumb_path, "wb") as f:
- f.write(thumbnail_data)
- thumbnail_path = str(thumb_path)
- # Clean metadata - remove non-JSON-serializable data (bytes, etc.)
- def clean_metadata(obj):
- if isinstance(obj, dict):
- return {
- k: clean_metadata(v)
- for k, v in obj.items()
- if not isinstance(v, bytes) and k not in ("_thumbnail_data", "_thumbnail_ext")
- }
- elif isinstance(obj, list):
- return [clean_metadata(i) for i in obj if not isinstance(i, bytes)]
- elif isinstance(obj, bytes):
- return None
- return obj
- metadata = clean_metadata(raw_metadata)
- except Exception as e:
- logger.warning("Failed to parse 3MF: %s", e)
- elif ext == ".gcode":
- # Extract embedded thumbnail from gcode
- try:
- thumbnail_data = extract_gcode_thumbnail(file_path)
- if thumbnail_data:
- thumb_filename = f"{uuid.uuid4().hex}.png"
- thumb_path = thumbnails_dir / thumb_filename
- with open(thumb_path, "wb") as f:
- f.write(thumbnail_data)
- thumbnail_path = str(thumb_path)
- except Exception as e:
- logger.warning("Failed to extract gcode thumbnail: %s", e)
- elif ext.lower() in IMAGE_EXTENSIONS:
- # For image files, create a thumbnail from the image itself
- thumbnail_path = create_image_thumbnail(file_path, thumbnails_dir)
- elif ext == ".stl":
- # Generate STL thumbnail if enabled
- if generate_stl_thumbnails:
- thumbnail_path = generate_stl_thumbnail(file_path, thumbnails_dir)
- # Create database entry (managed files store relative paths for portability;
- # external files store the absolute mount path — same shape as scan produces)
- library_file = LibraryFile(
- folder_id=folder_id,
- is_external=is_external_upload,
- filename=filename,
- file_path=_stored_file_path(file_path, is_external_upload),
- file_type=file_type,
- file_size=len(content),
- file_hash=file_hash,
- thumbnail_path=to_relative_path(thumbnail_path) if thumbnail_path else None,
- file_metadata=metadata if metadata else None,
- created_by_id=current_user.id if current_user else None,
- )
- db.add(library_file)
- await db.commit()
- await db.refresh(library_file)
- return FileUploadResponse(
- id=library_file.id,
- filename=library_file.filename,
- file_type=library_file.file_type,
- file_size=library_file.file_size,
- thumbnail_path=library_file.thumbnail_path,
- duplicate_of=duplicate_of,
- metadata=library_file.file_metadata,
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error("Upload failed for %s: %s", file.filename, e, exc_info=True)
- raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
- @router.post("/files/extract-zip", response_model=ZipExtractResponse)
- async def extract_zip_file(
- file: UploadFile = File(...),
- folder_id: int | None = Query(default=None),
- preserve_structure: bool = Query(default=True),
- create_folder_from_zip: bool = Query(default=False),
- generate_stl_thumbnails: bool = Query(default=True),
- db: AsyncSession = Depends(get_db),
- current_user: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Upload and extract a ZIP file to the library.
- Args:
- file: The ZIP file to extract
- folder_id: Target folder ID (None = root)
- preserve_structure: If True, recreate folder structure from ZIP; if False, extract all files flat
- create_folder_from_zip: If True, create a folder named after the ZIP file and extract into it
- generate_stl_thumbnails: If True, generate thumbnails for STL files
- """
- import tempfile
- if not file.filename or not file.filename.lower().endswith(".zip"):
- raise HTTPException(status_code=400, detail="Only ZIP files are supported")
- # Verify target folder exists if specified
- if folder_id is not None:
- folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- target_folder = folder_result.scalar_one_or_none()
- if not target_folder:
- raise HTTPException(status_code=404, detail="Target folder not found")
- if target_folder.is_external and target_folder.external_readonly:
- raise HTTPException(status_code=403, detail="Cannot extract ZIP to a read-only external folder")
- if target_folder.is_external:
- # Writable external folders aren't supported by extract-zip because the
- # nested-subfolder creation path would need to mkdir on the mount and
- # create matching is_external=True LibraryFolder rows — a separate
- # design. Direct the user at Scan, which already handles that shape
- # (#1112).
- raise HTTPException(
- status_code=400,
- detail=(
- "Cannot extract ZIP directly into an external folder. "
- "Extract the ZIP on the external mount and run 'Scan External Folder' instead."
- ),
- )
- # Save ZIP to temp file
- try:
- with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp:
- content = await file.read()
- tmp.write(content)
- tmp_path = tmp.name
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Failed to save ZIP file: {str(e)}")
- extracted_files: list[ZipExtractResult] = []
- errors: list[ZipExtractError] = []
- folders_created = 0
- folder_cache: dict[str, int] = {} # path -> folder_id
- # If create_folder_from_zip is True, create a folder named after the ZIP file
- zip_folder_id = folder_id
- logger.info(
- f"ZIP extraction: create_folder_from_zip={create_folder_from_zip}, folder_id={folder_id}, filename={file.filename}"
- )
- if create_folder_from_zip and file.filename:
- # Remove .zip extension to get folder name
- zip_folder_name = file.filename[:-4] if file.filename.lower().endswith(".zip") else file.filename
- # Check if folder already exists
- existing = await db.execute(
- select(LibraryFolder).where(
- LibraryFolder.name == zip_folder_name,
- LibraryFolder.parent_id == folder_id if folder_id else LibraryFolder.parent_id.is_(None),
- )
- )
- existing_folder = existing.scalar_one_or_none()
- if existing_folder:
- zip_folder_id = existing_folder.id
- logger.info("Reusing existing folder '%s' with id=%s", zip_folder_name, zip_folder_id)
- else:
- # Create folder
- new_folder = LibraryFolder(name=zip_folder_name, parent_id=folder_id)
- db.add(new_folder)
- await db.flush()
- await db.commit() # Commit folder creation immediately
- zip_folder_id = new_folder.id
- folders_created += 1
- logger.info("Created new folder '%s' with id=%s", zip_folder_name, zip_folder_id)
- try:
- with zipfile.ZipFile(tmp_path, "r") as zf:
- # Filter out directories and hidden/system files
- file_list = [
- name
- for name in zf.namelist()
- if not name.endswith("/")
- and not name.startswith("__MACOSX")
- and not os.path.basename(name).startswith(".")
- ]
- for zip_path in file_list:
- try:
- # Determine target folder (use zip_folder_id as base if create_folder_from_zip was used)
- target_folder_id = zip_folder_id
- if preserve_structure:
- # Get directory path from ZIP
- dir_path = os.path.dirname(zip_path)
- if dir_path:
- # Create folder structure
- parts = dir_path.split("/")
- current_parent = zip_folder_id
- current_path = ""
- for part in parts:
- if not part:
- continue
- current_path = f"{current_path}/{part}" if current_path else part
- if current_path in folder_cache:
- current_parent = folder_cache[current_path]
- else:
- # Check if folder exists
- existing = await db.execute(
- select(LibraryFolder).where(
- LibraryFolder.name == part,
- LibraryFolder.parent_id == current_parent
- if current_parent
- else LibraryFolder.parent_id.is_(None),
- )
- )
- existing_folder = existing.scalar_one_or_none()
- if existing_folder:
- current_parent = existing_folder.id
- else:
- # Create folder
- new_folder = LibraryFolder(name=part, parent_id=current_parent)
- db.add(new_folder)
- await db.flush()
- current_parent = new_folder.id
- folders_created += 1
- folder_cache[current_path] = current_parent
- target_folder_id = current_parent
- # Extract file
- filename = os.path.basename(zip_path)
- ext = os.path.splitext(filename)[1].lower()
- file_type = ext[1:] if ext else "unknown"
- # Generate unique filename for storage
- unique_filename = f"{uuid.uuid4().hex}{ext}"
- file_path = get_library_files_dir() / unique_filename
- # Extract and save file
- file_content = zf.read(zip_path)
- with open(file_path, "wb") as f:
- f.write(file_content)
- # Calculate hash
- file_hash = calculate_file_hash(file_path)
- # Extract metadata and thumbnail for 3MF files
- metadata = {}
- thumbnail_path = None
- thumbnails_dir = get_library_thumbnails_dir()
- if ext == ".3mf":
- try:
- parser = ThreeMFParser(str(file_path))
- raw_metadata = parser.parse()
- thumbnail_data = raw_metadata.get("_thumbnail_data")
- thumbnail_ext = raw_metadata.get("_thumbnail_ext", ".png")
- if thumbnail_data:
- thumb_filename = f"{uuid.uuid4().hex}{thumbnail_ext}"
- thumb_path = thumbnails_dir / thumb_filename
- with open(thumb_path, "wb") as f:
- f.write(thumbnail_data)
- thumbnail_path = str(thumb_path)
- def clean_metadata(obj):
- if isinstance(obj, dict):
- return {
- k: clean_metadata(v)
- for k, v in obj.items()
- if not isinstance(v, bytes) and k not in ("_thumbnail_data", "_thumbnail_ext")
- }
- elif isinstance(obj, list):
- return [clean_metadata(i) for i in obj if not isinstance(i, bytes)]
- elif isinstance(obj, bytes):
- return None
- return obj
- metadata = clean_metadata(raw_metadata)
- except Exception as e:
- logger.warning("Failed to parse 3MF from ZIP: %s", e)
- elif ext == ".gcode":
- try:
- thumbnail_data = extract_gcode_thumbnail(file_path)
- if thumbnail_data:
- thumb_filename = f"{uuid.uuid4().hex}.png"
- thumb_path = thumbnails_dir / thumb_filename
- with open(thumb_path, "wb") as f:
- f.write(thumbnail_data)
- thumbnail_path = str(thumb_path)
- except Exception as e:
- logger.warning("Failed to extract gcode thumbnail from ZIP: %s", e)
- elif ext.lower() in IMAGE_EXTENSIONS:
- thumbnail_path = create_image_thumbnail(file_path, thumbnails_dir)
- elif ext == ".stl":
- # Generate STL thumbnail if enabled
- if generate_stl_thumbnails:
- thumbnail_path = generate_stl_thumbnail(file_path, thumbnails_dir)
- # Create database entry (store relative paths for portability)
- library_file = LibraryFile(
- folder_id=target_folder_id,
- filename=filename,
- file_path=to_relative_path(file_path),
- file_type=file_type,
- file_size=len(file_content),
- file_hash=file_hash,
- thumbnail_path=to_relative_path(thumbnail_path) if thumbnail_path else None,
- file_metadata=metadata if metadata else None,
- created_by_id=current_user.id if current_user else None,
- )
- db.add(library_file)
- await db.flush()
- await db.refresh(library_file)
- extracted_files.append(
- ZipExtractResult(
- filename=filename,
- file_id=library_file.id,
- folder_id=target_folder_id,
- )
- )
- # Commit after each file to release database lock
- # This prevents long-running transactions from blocking other requests
- await db.commit()
- except Exception as e:
- logger.error("Failed to extract %s: %s", zip_path, e)
- errors.append(ZipExtractError(filename=os.path.basename(zip_path), error=str(e)))
- # Rollback the failed file but continue with others
- await db.rollback()
- return ZipExtractResponse(
- extracted=len(extracted_files),
- folders_created=folders_created,
- files=extracted_files,
- errors=errors,
- )
- except zipfile.BadZipFile:
- raise HTTPException(status_code=400, detail="Invalid or corrupted ZIP file")
- except Exception as e:
- logger.error("ZIP extraction failed: %s", e, exc_info=True)
- raise HTTPException(status_code=500, detail=f"ZIP extraction failed: {str(e)}")
- finally:
- # Clean up temp file
- try:
- os.unlink(tmp_path)
- except OSError:
- pass # Best-effort temp file cleanup; ignore if already removed
- # ============ STL Thumbnail Batch Generation ============
- @router.post("/generate-stl-thumbnails", response_model=BatchThumbnailResponse)
- async def batch_generate_stl_thumbnails(
- request: BatchThumbnailRequest,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPDATE_ALL)),
- ):
- """Generate thumbnails for STL files in batch.
- Note: Requires library:update_all permission since this is a batch operation
- that may affect files owned by different users.
- Can generate thumbnails for:
- - Specific file IDs (file_ids)
- - All STL files in a folder (folder_id)
- - All STL files missing thumbnails (all_missing=True)
- """
- thumbnails_dir = get_library_thumbnails_dir()
- results: list[BatchThumbnailResult] = []
- # Build query based on request
- query = LibraryFile.active().where(LibraryFile.file_type == "stl")
- if request.file_ids:
- # Specific files
- query = query.where(LibraryFile.id.in_(request.file_ids))
- elif request.folder_id is not None:
- # All STL files in a specific folder
- query = query.where(LibraryFile.folder_id == request.folder_id)
- if not request.all_missing:
- # If not specifically asking for missing thumbnails, get all
- pass
- else:
- query = query.where(LibraryFile.thumbnail_path.is_(None))
- elif request.all_missing:
- # All STL files without thumbnails
- query = query.where(LibraryFile.thumbnail_path.is_(None))
- else:
- # No criteria specified - return empty
- return BatchThumbnailResponse(
- processed=0,
- succeeded=0,
- failed=0,
- results=[],
- )
- result = await db.execute(query)
- stl_files = result.scalars().all()
- succeeded = 0
- failed = 0
- for stl_file in stl_files:
- file_path = to_absolute_path(stl_file.file_path)
- if not file_path or not file_path.exists():
- results.append(
- BatchThumbnailResult(
- file_id=stl_file.id,
- filename=stl_file.filename,
- success=False,
- error="File not found on disk",
- )
- )
- failed += 1
- continue
- try:
- thumbnail_path = generate_stl_thumbnail(file_path, thumbnails_dir)
- if thumbnail_path:
- # Update database with relative path
- stl_file.thumbnail_path = to_relative_path(thumbnail_path)
- await db.flush()
- results.append(
- BatchThumbnailResult(
- file_id=stl_file.id,
- filename=stl_file.filename,
- success=True,
- )
- )
- succeeded += 1
- else:
- results.append(
- BatchThumbnailResult(
- file_id=stl_file.id,
- filename=stl_file.filename,
- success=False,
- error="Thumbnail generation failed",
- )
- )
- failed += 1
- except Exception as e:
- logger.error("Failed to generate thumbnail for %s: %s", stl_file.filename, e)
- results.append(
- BatchThumbnailResult(
- file_id=stl_file.id,
- filename=stl_file.filename,
- success=False,
- error=str(e),
- )
- )
- failed += 1
- await db.commit()
- return BatchThumbnailResponse(
- processed=len(stl_files),
- succeeded=succeeded,
- failed=failed,
- results=results,
- )
- # ============ Queue Operations ============
- # NOTE: These routes must be defined BEFORE /files/{file_id} to avoid path parameter conflicts
- def is_sliced_file(filename: str) -> bool:
- """Check if a file is a sliced (printable) file.
- Sliced files are:
- - .gcode files
- - .3mf files that contain '.gcode.' in the name (e.g., filename.gcode.3mf)
- """
- lower = filename.lower()
- return lower.endswith(".gcode") or ".gcode." in lower
- @router.post("/files/add-to-queue", response_model=AddToQueueResponse)
- async def add_files_to_queue(
- request: AddToQueueRequest,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.QUEUE_CREATE)),
- ):
- """Add library files to the print queue.
- Only sliced files (.gcode or .gcode.3mf) can be added to the queue.
- The archive will be created automatically when the print starts.
- """
- added: list[AddToQueueResult] = []
- errors: list[AddToQueueError] = []
- # Get all requested files
- result = await db.execute(LibraryFile.active().where(LibraryFile.id.in_(request.file_ids)))
- files = {f.id: f for f in result.scalars().all()}
- # Get max position for queue ordering
- pos_result = await db.execute(select(func.coalesce(func.max(PrintQueueItem.position), 0)))
- max_position = pos_result.scalar() or 0
- for file_id in request.file_ids:
- lib_file = files.get(file_id)
- if not lib_file:
- errors.append(AddToQueueError(file_id=file_id, filename="(not found)", error="File not found"))
- continue
- # Validate file is sliced
- if not is_sliced_file(lib_file.filename):
- errors.append(
- AddToQueueError(
- file_id=file_id,
- filename=lib_file.filename,
- error="Not a sliced file. Only .gcode or .gcode.3mf files can be printed.",
- )
- )
- continue
- try:
- # Verify file exists on disk
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- errors.append(
- AddToQueueError(file_id=file_id, filename=lib_file.filename, error="File not found on disk")
- )
- continue
- # Create queue item referencing library file (archive created at print start)
- max_position += 1
- queue_item = PrintQueueItem(
- printer_id=None, # Unassigned
- library_file_id=file_id,
- position=max_position,
- status="pending",
- )
- db.add(queue_item)
- await db.flush() # Get queue_item.id
- added.append(
- AddToQueueResult(
- file_id=file_id,
- filename=lib_file.filename,
- queue_item_id=queue_item.id,
- )
- )
- except Exception as e:
- logger.exception("Error adding file %s to queue", file_id)
- errors.append(AddToQueueError(file_id=file_id, filename=lib_file.filename, error=str(e)))
- await db.commit()
- return AddToQueueResponse(added=added, errors=errors)
- @router.get("/files/{file_id}/plates")
- async def get_library_file_plates(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get available plates from a multi-plate 3MF library file.
- Returns a list of plates with their index, name, thumbnail availability,
- and filament requirements. For single-plate exports, returns a single plate.
- """
- import json
- import defusedxml.ElementTree as ET
- # Get the library file
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- lib_file = result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- # Only 3MF files have plates
- if not lib_file.filename.lower().endswith(".3mf"):
- return {"file_id": file_id, "filename": lib_file.filename, "plates": [], "is_multi_plate": False}
- plates = []
- try:
- with zipfile.ZipFile(file_path, "r") as zf:
- namelist = zf.namelist()
- # Find all plate gcode files to determine available plates
- gcode_files = [n for n in namelist if n.startswith("Metadata/plate_") and n.endswith(".gcode")]
- # If no gcode is present (source-only or unsliced), fall back to plate JSON/PNG
- plate_indices: list[int] = []
- if gcode_files:
- # Extract plate indices from gcode filenames
- for gf in gcode_files:
- try:
- plate_str = gf[15:-6] # Remove "Metadata/plate_" and ".gcode"
- plate_indices.append(int(plate_str))
- except ValueError:
- pass # Skip gcode file with non-numeric plate index
- else:
- plate_json_files = [n for n in namelist if n.startswith("Metadata/plate_") and n.endswith(".json")]
- plate_png_files = [
- n
- for n in namelist
- if n.startswith("Metadata/plate_")
- and n.endswith(".png")
- and "_small" not in n
- and "no_light" not in n
- ]
- plate_name_candidates = plate_json_files + plate_png_files
- plate_re = re.compile(r"^Metadata/plate_(\d+)\.(json|png)$")
- seen_indices: set[int] = set()
- for name in plate_name_candidates:
- match = plate_re.match(name)
- if match:
- try:
- index = int(match.group(1))
- except ValueError:
- continue
- if index in seen_indices:
- continue
- seen_indices.add(index)
- plate_indices.append(index)
- if not plate_indices:
- # No plate metadata found
- return {"file_id": file_id, "filename": lib_file.filename, "plates": [], "is_multi_plate": False}
- plate_indices.sort()
- # Parse model_settings.config for plate names + object assignments
- plate_names = {}
- plate_object_ids: dict[int, list[str]] = {}
- object_names_by_id: dict[str, str] = {}
- if "Metadata/model_settings.config" in namelist:
- try:
- model_content = zf.read("Metadata/model_settings.config").decode()
- model_root = ET.fromstring(model_content)
- for obj_elem in model_root.findall(".//object"):
- obj_id = obj_elem.get("id")
- if not obj_id:
- continue
- name_meta = obj_elem.find("metadata[@key='name']")
- obj_name = name_meta.get("value") if name_meta is not None else None
- if obj_name:
- object_names_by_id[obj_id] = obj_name
- for plate_elem in model_root.findall(".//plate"):
- plater_id = None
- plater_name = None
- for meta in plate_elem.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "plater_id" and value:
- try:
- plater_id = int(value)
- except ValueError:
- pass # Ignore plate with non-numeric plater_id
- elif key == "plater_name" and value:
- plater_name = value.strip()
- if plater_id is not None and plater_name:
- plate_names[plater_id] = plater_name
- if plater_id is not None:
- for instance_elem in plate_elem.findall("model_instance"):
- for inst_meta in instance_elem.findall("metadata"):
- if inst_meta.get("key") == "object_id":
- obj_id = inst_meta.get("value")
- if not obj_id:
- continue
- plate_object_ids.setdefault(plater_id, [])
- if obj_id not in plate_object_ids[plater_id]:
- plate_object_ids[plater_id].append(obj_id)
- except Exception:
- pass # model_settings.config is optional; skip if missing or malformed
- # Parse slice_info.config for plate metadata
- plate_metadata = {}
- if "Metadata/slice_info.config" in namelist:
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- for plate_elem in root.findall(".//plate"):
- plate_info = {"filaments": [], "prediction": None, "weight": None, "name": None, "objects": []}
- plate_index = None
- for meta in plate_elem.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "index" and value:
- try:
- plate_index = int(value)
- except ValueError:
- pass # Ignore plate with non-numeric index
- elif key == "prediction" and value:
- try:
- plate_info["prediction"] = int(value)
- except ValueError:
- pass # Leave prediction as None if not a valid integer
- elif key == "weight" and value:
- try:
- plate_info["weight"] = float(value)
- except ValueError:
- pass # Leave weight as None if not a valid number
- # Get filaments used in this plate
- for filament_elem in plate_elem.findall("filament"):
- filament_id = filament_elem.get("id")
- filament_type = filament_elem.get("type", "")
- filament_color = filament_elem.get("color", "")
- used_g = filament_elem.get("used_g", "0")
- used_m = filament_elem.get("used_m", "0")
- try:
- used_grams = float(used_g)
- except (ValueError, TypeError):
- used_grams = 0
- if used_grams > 0 and filament_id:
- plate_info["filaments"].append(
- {
- "slot_id": int(filament_id),
- "type": filament_type,
- "color": filament_color,
- "used_grams": round(used_grams, 1),
- "used_meters": float(used_m) if used_m else 0,
- }
- )
- plate_info["filaments"].sort(key=lambda x: x["slot_id"])
- # Collect object names
- for obj_elem in plate_elem.findall("object"):
- obj_name = obj_elem.get("name")
- if obj_name and obj_name not in plate_info["objects"]:
- plate_info["objects"].append(obj_name)
- # Set plate name
- if plate_index is not None:
- custom_name = plate_names.get(plate_index)
- if custom_name:
- plate_info["name"] = custom_name
- elif plate_info["objects"]:
- plate_info["name"] = plate_info["objects"][0]
- plate_metadata[plate_index] = plate_info
- # Parse plate_*.json for object lists when slice_info is missing
- plate_json_objects: dict[int, list[str]] = {}
- for name in namelist:
- match = re.match(r"^Metadata/plate_(\d+)\.json$", name)
- if not match:
- continue
- try:
- plate_index = int(match.group(1))
- except ValueError:
- continue
- try:
- payload = json.loads(zf.read(name).decode())
- bbox_objects = payload.get("bbox_objects", [])
- names: list[str] = []
- for obj in bbox_objects:
- obj_name = obj.get("name") if isinstance(obj, dict) else None
- if obj_name and obj_name not in names:
- names.append(obj_name)
- if names:
- plate_json_objects[plate_index] = names
- except Exception:
- continue
- # Build plate list
- for idx in plate_indices:
- meta = plate_metadata.get(idx, {})
- has_thumbnail = f"Metadata/plate_{idx}.png" in namelist
- objects = meta.get("objects", [])
- if not objects:
- objects = plate_json_objects.get(idx, [])
- if not objects and plate_object_ids.get(idx):
- objects = [
- object_names_by_id.get(obj_id, f"Object {obj_id}") for obj_id in plate_object_ids.get(idx, [])
- ]
- plate_name = meta.get("name")
- if not plate_name:
- plate_name = plate_names.get(idx)
- if not plate_name and objects:
- plate_name = objects[0]
- plates.append(
- {
- "index": idx,
- "name": plate_name,
- "objects": objects,
- "object_count": len(objects),
- "has_thumbnail": has_thumbnail,
- "thumbnail_url": f"/api/v1/library/files/{file_id}/plate-thumbnail/{idx}"
- if has_thumbnail
- else None,
- "print_time_seconds": meta.get("prediction"),
- "filament_used_grams": meta.get("weight"),
- "filaments": meta.get("filaments", []),
- }
- )
- except Exception as e:
- logger.warning("Failed to parse plates from library file %s: %s", file_id, e)
- return {
- "file_id": file_id,
- "filename": lib_file.filename,
- "plates": plates,
- "is_multi_plate": len(plates) > 1,
- }
- @router.get("/files/{file_id}/plate-thumbnail/{plate_index}")
- async def get_library_file_plate_thumbnail(
- file_id: int,
- plate_index: int,
- db: AsyncSession = Depends(get_db),
- _: None = RequireCameraStreamTokenIfAuthEnabled,
- ):
- """Get the thumbnail image for a specific plate from a library file."""
- from starlette.responses import Response
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- lib_file = result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- try:
- with zipfile.ZipFile(file_path, "r") as zf:
- thumb_path = f"Metadata/plate_{plate_index}.png"
- if thumb_path in zf.namelist():
- data = zf.read(thumb_path)
- return Response(content=data, media_type="image/png")
- except Exception:
- pass # Archive unreadable or thumbnail missing; fall through to 404
- raise HTTPException(status_code=404, detail=f"Thumbnail for plate {plate_index} not found")
- @router.get("/files/{file_id}/filament-requirements")
- async def get_library_file_filament_requirements(
- file_id: int,
- plate_id: int | None = None,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get filament requirements from a library file.
- Parses the 3MF file to extract filament slot IDs, types, colors, and usage.
- This enables AMS slot assignment when printing from the file manager.
- Args:
- file_id: The library file ID
- plate_id: Optional plate index to get filaments for a specific plate
- """
- import defusedxml.ElementTree as ET
- # Get the library file
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- lib_file = result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- # Get the full file path
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- # Only 3MF files have parseable filament info
- if not lib_file.filename.lower().endswith(".3mf"):
- return {"file_id": file_id, "filename": lib_file.filename, "plate_id": plate_id, "filaments": []}
- filaments = []
- try:
- with zipfile.ZipFile(file_path, "r") as zf:
- # Parse slice_info.config for filament requirements
- if "Metadata/slice_info.config" in zf.namelist():
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- if plate_id is not None:
- # Find filaments for specific plate
- for plate_elem in root.findall(".//plate"):
- # Check if this is the requested plate
- plate_index = None
- for meta in plate_elem.findall("metadata"):
- if meta.get("key") == "index":
- try:
- plate_index = int(meta.get("value", ""))
- except ValueError:
- pass # Skip plate with non-numeric index value
- break
- if plate_index == plate_id:
- # Extract filaments from this plate
- for filament_elem in plate_elem.findall("filament"):
- filament_id = filament_elem.get("id")
- filament_type = filament_elem.get("type", "")
- filament_color = filament_elem.get("color", "")
- used_g = filament_elem.get("used_g", "0")
- used_m = filament_elem.get("used_m", "0")
- tray_info_idx = filament_elem.get("tray_info_idx", "")
- try:
- used_grams = float(used_g)
- except (ValueError, TypeError):
- used_grams = 0
- if used_grams > 0 and filament_id:
- filaments.append(
- {
- "slot_id": int(filament_id),
- "type": filament_type,
- "color": filament_color,
- "used_grams": round(used_grams, 1),
- "used_meters": float(used_m) if used_m else 0,
- "tray_info_idx": tray_info_idx,
- }
- )
- break
- else:
- # Extract all filaments with used_g > 0 (for single-plate or overview)
- for filament_elem in root.findall(".//filament"):
- filament_id = filament_elem.get("id")
- filament_type = filament_elem.get("type", "")
- filament_color = filament_elem.get("color", "")
- used_g = filament_elem.get("used_g", "0")
- used_m = filament_elem.get("used_m", "0")
- tray_info_idx = filament_elem.get("tray_info_idx", "")
- try:
- used_grams = float(used_g)
- except (ValueError, TypeError):
- used_grams = 0
- if used_grams > 0 and filament_id:
- filaments.append(
- {
- "slot_id": int(filament_id),
- "type": filament_type,
- "color": filament_color,
- "used_grams": round(used_grams, 1),
- "used_meters": float(used_m) if used_m else 0,
- "tray_info_idx": tray_info_idx,
- }
- )
- # Sort by slot ID
- filaments.sort(key=lambda x: x["slot_id"])
- # Enrich with nozzle mapping for dual-nozzle printers
- nozzle_mapping = extract_nozzle_mapping_from_3mf(zf)
- if nozzle_mapping:
- for filament in filaments:
- filament["nozzle_id"] = nozzle_mapping.get(filament["slot_id"])
- except Exception as e:
- logger.warning("Failed to parse filament requirements from library file %s: %s", file_id, e)
- return {
- "file_id": file_id,
- "filename": lib_file.filename,
- "plate_id": plate_id,
- "filaments": filaments,
- }
- def _strip_3mf_embedded_settings(zip_bytes: bytes) -> bytes:
- """Remove ``Metadata/project_settings.config`` from a 3MF.
- Bambuddy supplies the slicer profile triplet via the sidecar's
- ``--load-settings`` path; the 3MF's embedded settings would otherwise be
- validated by the CLI first and can fail with sentinel-value range
- checks (`prime_tower_brim_width: -1 not in range`, etc.) regardless of
- what we pass via ``--load-settings``. Stripping the embedded config
- forces the CLI to use the supplied profiles only. Geometry, color, and
- multi-part data inside the 3MF are preserved.
- """
- from io import BytesIO
- src = BytesIO(zip_bytes)
- dst = BytesIO()
- with zipfile.ZipFile(src, "r") as zin, zipfile.ZipFile(dst, "w", zipfile.ZIP_DEFLATED) as zout:
- for item in zin.infolist():
- if item.filename == "Metadata/project_settings.config":
- continue
- zout.writestr(item, zin.read(item.filename))
- return dst.getvalue()
- async def _run_slicer_with_fallback(
- db: AsyncSession,
- *,
- model_bytes: bytes,
- model_filename: str,
- request: SliceRequest,
- ):
- """Validate presets, dispatch to the right sidecar, run the slicer with
- the auto-fallback for 3MF inputs whose `--load-settings` path crashes the
- CLI. Returns ``(SliceResult, used_embedded_settings: bool)``. Raises
- ``HTTPException`` for any caller-facing error.
- """
- from backend.app.api.routes.settings import get_setting
- from backend.app.models.local_preset import LocalPreset
- from backend.app.services.slicer_api import (
- SlicerApiServerError,
- SlicerApiService,
- SlicerApiUnavailableError,
- SlicerInputError,
- )
- # Profile triplet — every slot must match the expected preset_type
- presets: dict[str, str] = {}
- for pid, expected_type, key in (
- (request.printer_preset_id, "printer", "printer"),
- (request.process_preset_id, "process", "process"),
- (request.filament_preset_id, "filament", "filament"),
- ):
- preset = await db.get(LocalPreset, pid)
- if preset is None or preset.preset_type != expected_type:
- raise HTTPException(
- status_code=400,
- detail=f"Invalid {key} preset id (expected preset_type='{expected_type}')",
- )
- presets[key] = preset.setting
- # Slicer routing — pick the sidecar URL by preferred_slicer.
- # The per-install URL setting (Settings UI → Slicer card) wins; an
- # empty value falls back to the SLICER_API_URL / BAMBU_STUDIO_API_URL
- # env defaults defined in core/config.py.
- preferred = (await get_setting(db, "preferred_slicer")) or "bambu_studio"
- if preferred == "orcaslicer":
- configured = await get_setting(db, "orcaslicer_api_url")
- api_url = (configured or app_settings.slicer_api_url).strip()
- elif preferred == "bambu_studio":
- configured = await get_setting(db, "bambu_studio_api_url")
- api_url = (configured or app_settings.bambu_studio_api_url).strip()
- else:
- raise HTTPException(
- status_code=400,
- detail=f"Unknown preferred_slicer setting: '{preferred}'. Expected 'orcaslicer' or 'bambu_studio'.",
- )
- is_3mf = model_filename.lower().endswith(".3mf")
- primary_bytes = model_bytes
- if is_3mf:
- try:
- primary_bytes = _strip_3mf_embedded_settings(model_bytes)
- except (zipfile.BadZipFile, KeyError) as exc:
- raise HTTPException(status_code=400, detail=f"Source 3MF is corrupt: {exc}") from exc
- used_embedded_settings = False
- service = SlicerApiService(api_url)
- try:
- try:
- result = await service.slice_with_profiles(
- model_bytes=primary_bytes,
- model_filename=model_filename,
- printer_profile_json=presets["printer"],
- process_profile_json=presets["process"],
- filament_profile_json=presets["filament"],
- plate=request.plate,
- export_3mf=request.export_3mf,
- )
- except SlicerApiServerError as exc:
- if not is_3mf:
- raise
- logger.warning(
- "Slicer CLI rejected --load-settings for %s (%s); retrying with embedded settings",
- model_filename,
- exc,
- )
- result = await service.slice_without_profiles(
- model_bytes=model_bytes,
- model_filename=model_filename,
- plate=request.plate,
- export_3mf=request.export_3mf,
- )
- used_embedded_settings = True
- except SlicerInputError as exc:
- raise HTTPException(status_code=400, detail=str(exc)) from exc
- except SlicerApiServerError as exc:
- raise HTTPException(status_code=502, detail=str(exc)) from exc
- except SlicerApiUnavailableError as exc:
- raise HTTPException(status_code=502, detail=str(exc)) from exc
- finally:
- await service.close()
- return result, used_embedded_settings
- async def slice_and_persist(
- db: AsyncSession,
- *,
- model_bytes: bytes,
- model_filename: str,
- folder_id: int | None,
- extra_metadata: dict | None,
- request: SliceRequest,
- current_user_id: int | None,
- ) -> SliceResponse:
- """Slice a model and save the result as a new ``LibraryFile`` in
- ``folder_id`` (same folder as the source by convention).
- Always exports as ``.gcode.3mf`` so the existing library thumbnail
- pipeline works on the new file. Plain ``.gcode`` would have no
- embedded thumbnail to extract.
- """
- from backend.app.services.archive import ThreeMFParser
- library_request = request.model_copy(update={"export_3mf": True})
- result, used_embedded_settings = await _run_slicer_with_fallback(
- db,
- model_bytes=model_bytes,
- model_filename=model_filename,
- request=library_request,
- )
- base_name = model_filename.rsplit(".", 1)[0]
- out_filename = f"{base_name}.gcode.3mf"
- unique_name = f"{uuid.uuid4().hex}.gcode.3mf"
- out_path = get_library_files_dir() / unique_name
- out_path.write_bytes(result.content)
- # Extract thumbnail from the produced 3MF so the library card shows a
- # preview. Failures here aren't fatal — the file is still useful
- # without a thumbnail.
- thumbnail_relative: str | None = None
- parsed_metadata: dict = {}
- try:
- parser = ThreeMFParser(str(out_path))
- parsed = parser.parse()
- thumb_data = parsed.get("_thumbnail_data")
- thumb_ext = parsed.get("_thumbnail_ext", ".png")
- if thumb_data:
- thumb_filename = f"{uuid.uuid4().hex}{thumb_ext}"
- thumb_path = get_library_thumbnails_dir() / thumb_filename
- thumb_path.write_bytes(thumb_data)
- thumbnail_relative = to_relative_path(thumb_path)
- cleaned = _clean_3mf_metadata(parsed)
- if isinstance(cleaned, dict):
- parsed_metadata = cleaned
- except Exception as exc:
- logger.warning("Failed to parse sliced 3MF metadata for %s: %s", out_filename, exc)
- # The parsed 3MF metadata carries a `print_name` lifted from the source
- # file's embedded settings (BambuStudio always sets this; OrcaSlicer
- # often leaves it blank). The FileManager listing prefers print_name
- # over filename for display, which makes a sliced row indistinguishable
- # from its source. Drop print_name so the listing falls back to the
- # actual filename — which already ends in ".gcode.3mf" and self-describes
- # as the sliced output.
- metadata: dict = {k: v for k, v in parsed_metadata.items() if k != "print_name"}
- metadata.update(
- {
- "print_time_seconds": result.print_time_seconds,
- "filament_used_g": result.filament_used_g,
- "filament_used_mm": result.filament_used_mm,
- }
- )
- if used_embedded_settings:
- metadata["used_embedded_settings"] = True
- if extra_metadata:
- metadata.update(extra_metadata)
- new_file = LibraryFile(
- folder_id=folder_id,
- filename=out_filename,
- file_path=to_relative_path(out_path),
- # Sliced output is a `.gcode.3mf` zip with embedded G-code, but the
- # user-facing meaning is "ready-to-print G-code" — using "gcode"
- # gives it the same badge as plain .gcode files and distinguishes
- # it from un-sliced `.3mf` source models.
- file_type="gcode",
- file_size=len(result.content),
- file_hash=hashlib.sha256(result.content).hexdigest(),
- thumbnail_path=thumbnail_relative,
- file_metadata=metadata,
- source_type="sliced",
- created_by_id=current_user_id,
- )
- db.add(new_file)
- await db.commit()
- await db.refresh(new_file)
- return SliceResponse(
- library_file_id=new_file.id,
- name=new_file.filename,
- print_time_seconds=result.print_time_seconds,
- filament_used_g=result.filament_used_g,
- filament_used_mm=result.filament_used_mm,
- used_embedded_settings=used_embedded_settings,
- )
- async def slice_and_persist_as_archive(
- db: AsyncSession,
- *,
- model_bytes: bytes,
- model_filename: str,
- request: SliceRequest,
- source_archive, # PrintArchive — hint kept loose to avoid cyclic import
- current_user_id: int | None,
- ):
- """Slice a model and save the result as a new ``PrintArchive`` row,
- inheriting printer / project / makerworld metadata from the source
- archive. Always exports as a `.gcode.3mf` so the existing thumbnail
- and plates infrastructure (which expects a zip-shaped 3MF) works on
- the new archive. Returns ``SliceArchiveResponse``.
- """
- from backend.app.models.archive import PrintArchive
- from backend.app.schemas.slicer import SliceArchiveResponse
- from backend.app.services.archive import ThreeMFParser
- # Archive sinks always want a 3MF. The library route still respects the
- # caller's `export_3mf` flag; here we override.
- archive_request = request.model_copy(update={"export_3mf": True})
- result, used_embedded_settings = await _run_slicer_with_fallback(
- db,
- model_bytes=model_bytes,
- model_filename=model_filename,
- request=archive_request,
- )
- base_name = model_filename.rsplit(".", 1)[0]
- out_filename = f"{base_name}.gcode.3mf"
- timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
- printer_folder = str(source_archive.printer_id) if source_archive.printer_id is not None else "unassigned"
- archive_subdir = f"{timestamp}_{base_name}_sliced"
- archive_dir = app_settings.archive_dir / printer_folder / archive_subdir
- archive_dir.mkdir(parents=True, exist_ok=True)
- out_path = archive_dir / out_filename
- out_path.write_bytes(result.content)
- # Extract a thumbnail from the produced 3MF so the new archive card has
- # a preview. The 3MF parser pulls Metadata/plate_*.png; failures here
- # shouldn't fail the whole slice — the archive row is still useful
- # without a thumbnail.
- thumbnail_path: str | None = None
- parsed_metadata: dict = {}
- try:
- parser = ThreeMFParser(str(out_path))
- parsed = parser.parse()
- thumb_data = parsed.get("_thumbnail_data")
- thumb_ext = parsed.get("_thumbnail_ext", ".png")
- if thumb_data:
- thumb_dest = archive_dir / f"thumbnail{thumb_ext}"
- thumb_dest.write_bytes(thumb_data)
- thumbnail_path = str(thumb_dest.relative_to(app_settings.base_dir))
- parsed_metadata = {k: v for k, v in parsed.items() if not k.startswith("_")}
- except Exception as exc:
- logger.warning("Failed to parse sliced 3MF metadata for %s: %s", out_filename, exc)
- metadata = dict(source_archive.extra_data) if source_archive.extra_data else {}
- metadata.update(parsed_metadata)
- metadata.update(
- {
- "sliced_from_archive_id": source_archive.id,
- "print_time_seconds": result.print_time_seconds,
- "filament_used_g": result.filament_used_g,
- "filament_used_mm": result.filament_used_mm,
- }
- )
- if used_embedded_settings:
- metadata["used_embedded_settings"] = True
- new_archive = PrintArchive(
- printer_id=source_archive.printer_id,
- project_id=source_archive.project_id,
- filename=out_filename,
- file_path=str(out_path.relative_to(app_settings.base_dir)),
- file_size=len(result.content),
- content_hash=hashlib.sha256(result.content).hexdigest(),
- thumbnail_path=thumbnail_path,
- # Inherit identity from the source archive so the new entry shows
- # up alongside its sibling in the archives list.
- print_name=(source_archive.print_name or base_name) + " (re-sliced)",
- print_time_seconds=result.print_time_seconds,
- filament_used_grams=result.filament_used_g or None,
- filament_type=source_archive.filament_type,
- filament_color=source_archive.filament_color,
- layer_height=source_archive.layer_height,
- nozzle_diameter=source_archive.nozzle_diameter,
- sliced_for_model=source_archive.sliced_for_model,
- makerworld_url=source_archive.makerworld_url,
- designer=source_archive.designer,
- # Sliced-but-not-printed: keep status default ("completed") so it
- # surfaces in the normal archives list, but do not stamp
- # started/completed_at — the user hasn't actually printed it yet.
- extra_data=metadata,
- created_by_id=current_user_id,
- )
- db.add(new_archive)
- await db.commit()
- await db.refresh(new_archive)
- return SliceArchiveResponse(
- archive_id=new_archive.id,
- name=new_archive.print_name or out_filename,
- print_time_seconds=result.print_time_seconds,
- filament_used_g=result.filament_used_g,
- filament_used_mm=result.filament_used_mm,
- used_embedded_settings=used_embedded_settings,
- )
- @router.post("/files/{file_id}/slice", status_code=202)
- async def slice_library_file(
- file_id: int,
- request: SliceRequest,
- db: AsyncSession = Depends(get_db),
- current_user: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_UPLOAD)),
- ):
- """Enqueue a slice job for a library file. Returns 202 + job_id; the
- slice runs in the background, the caller polls `GET /slice-jobs/{id}`.
- """
- from backend.app.core.database import async_session
- from backend.app.services.slice_dispatch import (
- http_exception_to_job_error,
- slice_dispatch,
- )
- src_result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- lib_file = src_result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- src_lower = (lib_file.filename or "").lower()
- if not (
- src_lower.endswith(".stl")
- or src_lower.endswith(".3mf")
- or src_lower.endswith(".step")
- or src_lower.endswith(".stp")
- ):
- raise HTTPException(status_code=400, detail="Source file must be STL, 3MF, or STEP")
- src_path = Path(app_settings.base_dir) / lib_file.file_path
- if not src_path.exists():
- raise HTTPException(status_code=404, detail="Source file missing on disk")
- # Capture inputs the bg task needs — the request DB session is closed
- # before the background task runs.
- model_bytes = src_path.read_bytes()
- folder_id = lib_file.folder_id
- source_lib_file_id = lib_file.id
- user_id = current_user.id if current_user else None
- # If the source has a `print_name` in its metadata (BambuStudio always
- # sets this; OrcaSlicer often leaves it blank), derive the sliced
- # output's filename from it instead of the raw filename. The source
- # row's display already prefers print_name, so the sliced row's
- # filename ("Piggo the piggy bank.gcode.3mf") will match the source's
- # display name ("Piggo the piggy bank") with the gcode extension added.
- src_print_name = None
- if lib_file.file_metadata:
- candidate = lib_file.file_metadata.get("print_name")
- if isinstance(candidate, str) and candidate.strip():
- src_print_name = candidate.strip()
- src_ext = Path(lib_file.filename).suffix.lower() or ".3mf"
- model_filename = f"{src_print_name}{src_ext}" if src_print_name else lib_file.filename
- async def _run():
- async with async_session() as task_db:
- try:
- response = await slice_and_persist(
- task_db,
- model_bytes=model_bytes,
- model_filename=model_filename,
- folder_id=folder_id,
- extra_metadata={"sliced_from_library_file_id": source_lib_file_id},
- request=request,
- current_user_id=user_id,
- )
- except HTTPException as exc:
- raise http_exception_to_job_error(exc) from exc
- return response.model_dump()
- job = await slice_dispatch.enqueue(
- kind="library_file",
- source_id=lib_file.id,
- source_name=lib_file.filename,
- run=_run,
- )
- return {
- "job_id": job.id,
- "status": job.status,
- "status_url": f"/api/v1/slice-jobs/{job.id}",
- }
- @router.post("/files/{file_id}/print")
- async def print_library_file(
- file_id: int,
- printer_id: int,
- body: FilePrintRequest | None = None,
- db: AsyncSession = Depends(get_db),
- current_user: User | None = Depends(require_permission_if_auth_enabled(Permission.PRINTERS_CONTROL)),
- ):
- """Dispatch a library file for send/start on a printer.
- The actual send/start work is handled asynchronously by background
- dispatch so the UI can continue immediately.
- Only sliced files (.gcode or .gcode.3mf) can be printed.
- """
- from backend.app.models.printer import Printer
- from backend.app.services.background_dispatch import DispatchEnqueueRejected, background_dispatch
- from backend.app.services.printer_manager import printer_manager
- # Use defaults if no body provided
- if body is None:
- body = FilePrintRequest()
- # Get the library file
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- lib_file = result.scalar_one_or_none()
- if not lib_file:
- raise HTTPException(status_code=404, detail="File not found")
- # Validate file is sliced
- if not is_sliced_file(lib_file.filename):
- raise HTTPException(
- status_code=400,
- detail="Not a sliced file. Only .gcode or .gcode.3mf files can be printed.",
- )
- # Get the full file path
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- # Get printer
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(status_code=404, detail="Printer not found")
- # Check printer is connected
- if not printer_manager.is_connected(printer_id):
- raise HTTPException(status_code=400, detail="Printer is not connected")
- # Validate project exists before dispatching so a bogus ID yields 404, not a FK-constraint 500
- if body.project_id is not None:
- project_result = await db.execute(select(Project).where(Project.id == body.project_id))
- if not project_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Project not found")
- plate_name = body.plate_name
- if not plate_name and body.plate_id is not None:
- plate_name = f"Plate {body.plate_id}"
- dispatch_source_name = lib_file.filename
- if plate_name:
- dispatch_source_name = f"{lib_file.filename} • {plate_name}"
- try:
- dispatch_result = await background_dispatch.dispatch_print_library_file(
- file_id=file_id,
- filename=dispatch_source_name,
- printer_id=printer_id,
- printer_name=printer.name,
- options=body.model_dump(exclude_none=True, exclude={"cleanup_library_after_dispatch"}),
- project_id=body.project_id,
- requested_by_user_id=current_user.id if current_user else None,
- requested_by_username=current_user.username if current_user else None,
- cleanup_library_after_dispatch=body.cleanup_library_after_dispatch,
- )
- except DispatchEnqueueRejected as e:
- raise HTTPException(status_code=409, detail=str(e)) from e
- return {
- "status": "dispatched",
- "printer_id": printer_id,
- "archive_id": None,
- "filename": lib_file.filename,
- "dispatch_job_id": dispatch_result["dispatch_job_id"],
- "dispatch_position": dispatch_result["dispatch_position"],
- }
- # ============ File Detail Endpoints ============
- @router.get("/files/{file_id}", response_model=FileResponseSchema)
- async def get_file(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get a file by ID with full details."""
- result = await db.execute(
- LibraryFile.active().options(selectinload(LibraryFile.created_by)).where(LibraryFile.id == file_id)
- )
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- # Get folder name
- folder_name = None
- if file.folder_id:
- folder_result = await db.execute(select(LibraryFolder.name).where(LibraryFolder.id == file.folder_id))
- folder_name = folder_result.scalar()
- # Get project name
- project_name = None
- if file.project_id:
- project_result = await db.execute(select(Project.name).where(Project.id == file.project_id))
- project_name = project_result.scalar()
- # Get duplicates
- duplicates = []
- duplicate_count = 0
- if file.file_hash:
- dup_result = await db.execute(
- select(LibraryFile, LibraryFolder.name)
- .outerjoin(LibraryFolder, LibraryFile.folder_id == LibraryFolder.id)
- .where(
- LibraryFile.file_hash == file.file_hash,
- LibraryFile.id != file.id,
- LibraryFile.deleted_at.is_(None),
- )
- )
- for dup_file, dup_folder_name in dup_result.all():
- duplicates.append(
- FileDuplicate(
- id=dup_file.id,
- filename=dup_file.filename,
- folder_id=dup_file.folder_id,
- folder_name=dup_folder_name,
- created_at=dup_file.created_at,
- )
- )
- duplicate_count = len(duplicates)
- # Extract key metadata fields
- print_name = None
- print_time = None
- filament_grams = None
- sliced_for_model = None
- if file.file_metadata:
- print_name = file.file_metadata.get("print_name")
- print_time = file.file_metadata.get("print_time_seconds")
- filament_grams = file.file_metadata.get("filament_used_grams")
- sliced_for_model = file.file_metadata.get("sliced_for_model")
- return FileResponseSchema(
- id=file.id,
- folder_id=file.folder_id,
- folder_name=folder_name,
- project_id=file.project_id,
- project_name=project_name,
- filename=file.filename,
- file_path=file.file_path,
- file_type=file.file_type,
- file_size=file.file_size,
- file_hash=file.file_hash,
- thumbnail_path=file.thumbnail_path,
- metadata=file.file_metadata,
- print_count=file.print_count,
- last_printed_at=file.last_printed_at,
- notes=file.notes,
- duplicates=duplicates if duplicates else None,
- duplicate_count=duplicate_count,
- created_by_id=file.created_by_id,
- created_by_username=file.created_by.username if file.created_by else None,
- created_at=file.created_at,
- updated_at=file.updated_at,
- print_name=print_name,
- print_time_seconds=print_time,
- filament_used_grams=filament_grams,
- sliced_for_model=sliced_for_model,
- )
- @router.put("/files/{file_id}", response_model=FileResponseSchema)
- async def update_file(
- file_id: int,
- data: FileUpdate,
- db: AsyncSession = Depends(get_db),
- auth_result: tuple[User | None, bool] = Depends(
- require_ownership_permission(
- Permission.LIBRARY_UPDATE_ALL,
- Permission.LIBRARY_UPDATE_OWN,
- )
- ),
- ):
- """Update a file's metadata."""
- user, can_modify_all = auth_result
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- # Ownership check
- if not can_modify_all:
- if file.created_by_id != user.id:
- raise HTTPException(status_code=403, detail="You can only update your own files")
- if data.filename is not None:
- # Validate filename doesn't contain path separators
- if "/" in data.filename or "\\" in data.filename:
- raise HTTPException(status_code=400, detail="Filename cannot contain path separators")
- file.filename = data.filename
- # Also update print_name in file_metadata so the display name matches
- if file.file_metadata and "print_name" in file.file_metadata:
- file.file_metadata = {**file.file_metadata, "print_name": data.filename}
- if data.folder_id is not None:
- if data.folder_id == 0:
- file.folder_id = None
- else:
- # Verify folder exists
- folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == data.folder_id))
- if not folder_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Folder not found")
- file.folder_id = data.folder_id
- if data.project_id is not None:
- if data.project_id == 0:
- file.project_id = None
- else:
- # Verify project exists
- project_result = await db.execute(select(Project).where(Project.id == data.project_id))
- if not project_result.scalar_one_or_none():
- raise HTTPException(status_code=404, detail="Project not found")
- file.project_id = data.project_id
- if data.notes is not None:
- file.notes = data.notes if data.notes else None
- await db.commit()
- await db.refresh(file)
- # Return full response (reuse get_file logic)
- return await get_file(file_id, db)
- @router.delete("/files/{file_id}")
- async def delete_file(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- auth_result: tuple[User | None, bool] = Depends(
- require_ownership_permission(
- Permission.LIBRARY_DELETE_ALL,
- Permission.LIBRARY_DELETE_OWN,
- )
- ),
- ):
- """Move a file to the trash (soft-delete).
- The file's bytes and thumbnail stay on disk until the trash sweeper
- hard-deletes the row after the retention window (see #1008). External
- files skip the trash entirely — they can't be restored from disk and the
- underlying file is outside Bambuddy's control, so we just drop the DB
- record and thumbnail.
- """
- user, can_modify_all = auth_result
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- # Ownership check
- if not can_modify_all:
- if file.created_by_id != user.id:
- raise HTTPException(status_code=403, detail="You can only delete your own files")
- if file.is_external:
- # External files bypass the trash — just drop the DB row + our thumbnail.
- abs_thumb_path = to_absolute_path(file.thumbnail_path)
- if abs_thumb_path and abs_thumb_path.exists():
- try:
- abs_thumb_path.unlink()
- except OSError as e:
- logger.warning("Failed to delete thumbnail from disk: %s", e)
- await db.delete(file)
- await db.commit()
- return {"status": "success", "message": "File deleted", "trashed": False}
- # Managed file: soft-delete. Sweeper removes bytes + thumbnail after retention.
- file.deleted_at = datetime.now(timezone.utc)
- await db.commit()
- return {"status": "success", "message": "File moved to trash", "trashed": True}
- # ============ File Content Endpoints ============
- @router.get("/files/{file_id}/download")
- async def download_file(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Download a file."""
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- abs_path = to_absolute_path(file.file_path)
- if not abs_path or not abs_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- return FastAPIFileResponse(
- str(abs_path),
- filename=file.filename,
- media_type="application/octet-stream",
- )
- @router.post("/files/{file_id}/slicer-token")
- async def create_library_slicer_token(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Create a short-lived download token for opening files in slicer applications.
- Slicer protocol handlers (bambustudioopen://, orcaslicer://) cannot send
- auth headers, so they use this token in the URL path instead.
- """
- from backend.app.core.auth import create_slicer_download_token
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- token = await create_slicer_download_token("library", file_id)
- return {"token": token}
- @router.get("/files/{file_id}/dl/{token}/{filename}")
- async def download_library_file_for_slicer(
- file_id: int,
- token: str,
- filename: str,
- db: AsyncSession = Depends(get_db),
- ):
- """Download a library file using a slicer download token.
- Token-authenticated (no auth headers needed). The token is short-lived
- and single-use, created by POST /files/{file_id}/slicer-token.
- Filename is at the end of the URL so slicers can detect the file format.
- """
- from backend.app.core.auth import verify_slicer_download_token
- if not await verify_slicer_download_token(token, "library", file_id):
- raise HTTPException(status_code=403, detail="Invalid or expired download token")
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- abs_path = to_absolute_path(file.file_path)
- if not abs_path or not abs_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- return FastAPIFileResponse(
- str(abs_path),
- filename=file.filename,
- media_type="application/octet-stream",
- )
- @router.get("/files/{file_id}/thumbnail")
- async def get_thumbnail(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: None = RequireCameraStreamTokenIfAuthEnabled,
- ):
- """Get a file's thumbnail."""
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- abs_thumb_path = to_absolute_path(file.thumbnail_path)
- if not abs_thumb_path or not abs_thumb_path.exists():
- raise HTTPException(status_code=404, detail="Thumbnail not found")
- # Detect media type from extension
- thumb_ext = abs_thumb_path.suffix.lower()
- media_types = {
- ".png": "image/png",
- ".jpg": "image/jpeg",
- ".jpeg": "image/jpeg",
- ".gif": "image/gif",
- ".webp": "image/webp",
- }
- media_type = media_types.get(thumb_ext, "image/png")
- return FastAPIFileResponse(str(abs_thumb_path), media_type=media_type)
- @router.get("/files/{file_id}/gcode")
- async def get_gcode(
- file_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get gcode for a file (for preview)."""
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- raise HTTPException(status_code=404, detail="File not found")
- abs_path = to_absolute_path(file.file_path)
- if not abs_path or not abs_path.exists():
- raise HTTPException(status_code=404, detail="File not found on disk")
- if file.file_type == "gcode":
- return FastAPIFileResponse(str(abs_path), media_type="text/plain")
- elif file.file_type == "3mf":
- # Extract gcode from 3mf
- try:
- with zipfile.ZipFile(str(abs_path), "r") as zf:
- # Find gcode file
- gcode_files = [n for n in zf.namelist() if n.endswith(".gcode")]
- if not gcode_files:
- raise HTTPException(status_code=404, detail="No gcode found in 3MF file")
- gcode_content = zf.read(gcode_files[0])
- from fastapi.responses import Response
- return Response(content=gcode_content, media_type="text/plain")
- except zipfile.BadZipFile:
- raise HTTPException(status_code=400, detail="Invalid 3MF file")
- else:
- raise HTTPException(status_code=400, detail="Unsupported file type")
- # ============ Bulk Operations ============
- @router.post("/files/move")
- async def move_files(
- data: FileMoveRequest,
- db: AsyncSession = Depends(get_db),
- auth_result: tuple[User | None, bool] = Depends(
- require_ownership_permission(
- Permission.LIBRARY_UPDATE_ALL,
- Permission.LIBRARY_UPDATE_OWN,
- )
- ),
- ):
- """Move multiple files to a folder.
- Cross-boundary moves (managed ↔ external, or external ↔ external)
- physically relocate the bytes — see ``_move_file_bytes``. Same-boundary
- moves stay DB-only because the file's on-disk location doesn't depend
- on which managed folder owns it.
- Files not owned by the user are skipped (unless user has ``*_all``
- permission). Each skip carries a structured reason so the UI can
- surface "5 of 10 files were skipped: 3 had filename collisions on
- the NAS, 2 are no longer on disk" rather than a blank "skipped: 5".
- """
- user, can_modify_all = auth_result
- # Verify folder exists if specified
- target_folder: LibraryFolder | None = None
- if data.folder_id is not None:
- folder_result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == data.folder_id))
- target_folder = folder_result.scalar_one_or_none()
- if not target_folder:
- raise HTTPException(status_code=404, detail="Folder not found")
- if target_folder.is_external and target_folder.external_readonly:
- raise HTTPException(status_code=403, detail="Cannot move files to a read-only external folder")
- target_is_external = target_folder is not None and target_folder.is_external
- moved = 0
- skipped = 0
- skipped_reasons: list[dict] = []
- for file_id in data.file_ids:
- result = await db.execute(
- LibraryFile.active().options(selectinload(LibraryFile.folder)).where(LibraryFile.id == file_id)
- )
- file = result.scalar_one_or_none()
- if not file:
- continue
- # Ownership check
- if not can_modify_all and file.created_by_id != user.id:
- skipped += 1
- skipped_reasons.append({"file_id": file_id, "code": "not_owner", "reason": "not the file owner"})
- continue
- # No bytes need to move when both ends are managed (same-boundary).
- if not file.is_external and not target_is_external:
- file.folder_id = data.folder_id
- moved += 1
- continue
- # Block moves out of a read-only external mount. The user only has
- # read access to the source, and a move is semantically a delete on
- # the source — which a read-only mount can't fulfil. Without this
- # guard we'd succeed at copying to the target, fail to unlink the
- # source, and the same file would now exist in two places (with
- # the DB pointing at only one).
- if file.is_external and file.folder is not None and file.folder.external_readonly:
- skipped += 1
- skipped_reasons.append(
- {"file_id": file_id, "code": "source_readonly", "reason": "source is on a read-only external folder"}
- )
- continue
- # Otherwise relocate the bytes, then update the DB row to match.
- try:
- new_file_path = _move_file_bytes(file, target_folder)
- except _MoveSkip as e:
- skipped += 1
- skipped_reasons.append({"file_id": file_id, "code": e.code, "reason": e.reason})
- continue
- file.is_external = target_is_external
- file.folder_id = data.folder_id
- file.file_path = new_file_path
- # External rows historically carry `file_hash=None` (scan skips
- # hashing). When pulling an external file into managed storage,
- # compute the hash so dedup detection works for future uploads
- # of the same content.
- if not target_is_external and file.file_hash is None:
- try:
- abs_path = to_absolute_path(new_file_path)
- if abs_path:
- file.file_hash = calculate_file_hash(abs_path)
- except OSError:
- pass # leave hash null; dedup just won't match this row
- moved += 1
- await db.commit()
- return {
- "status": "success",
- "moved": moved,
- "skipped": skipped,
- "skipped_reasons": skipped_reasons,
- }
- @router.post("/bulk-delete", response_model=BulkDeleteResponse)
- async def bulk_delete(
- data: BulkDeleteRequest,
- db: AsyncSession = Depends(get_db),
- auth_result: tuple[User | None, bool] = Depends(
- require_ownership_permission(
- Permission.LIBRARY_DELETE_ALL,
- Permission.LIBRARY_DELETE_OWN,
- )
- ),
- ):
- """Delete multiple files and/or folders.
- Files not owned by the user are skipped (unless user has *_all permission).
- """
- user, can_modify_all = auth_result
- deleted_files = 0
- deleted_folders = 0
- skipped_files = 0
- # Delete files first. Managed files go to trash (sweeper hard-deletes bytes
- # later); external files bypass trash since their disk state is outside our
- # control and can't be restored from trash anyway.
- now = datetime.now(timezone.utc)
- for file_id in data.file_ids:
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == file_id))
- file = result.scalar_one_or_none()
- if not file:
- continue
- if not can_modify_all and file.created_by_id != user.id:
- skipped_files += 1
- continue
- if file.is_external:
- abs_thumb_path = to_absolute_path(file.thumbnail_path)
- if abs_thumb_path and abs_thumb_path.exists():
- try:
- abs_thumb_path.unlink()
- except OSError as e:
- logger.warning("Failed to delete thumbnail from disk: %s", e)
- await db.delete(file)
- else:
- file.deleted_at = now
- deleted_files += 1
- # Delete folders (cascade will handle contents)
- # Note: Folders don't have ownership tracking currently, require *_all permission
- for folder_id in data.folder_ids:
- if not can_modify_all:
- # Users without *_all permission cannot delete folders
- continue
- result = await db.execute(select(LibraryFolder).where(LibraryFolder.id == folder_id))
- folder = result.scalar_one_or_none()
- if folder:
- # Count files that will be deleted
- file_count_result = await db.execute(
- select(func.count(LibraryFile.id)).where(
- LibraryFile.folder_id == folder_id,
- LibraryFile.deleted_at.is_(None),
- )
- )
- deleted_files += file_count_result.scalar() or 0
- await db.delete(folder)
- deleted_folders += 1
- await db.commit()
- return BulkDeleteResponse(deleted_files=deleted_files, deleted_folders=deleted_folders)
- # ============ Stats Endpoint ============
- @router.get("/stats")
- async def get_library_stats(
- db: AsyncSession = Depends(get_db),
- _: User | None = Depends(require_permission_if_auth_enabled(Permission.LIBRARY_READ)),
- ):
- """Get library statistics."""
- # Stats exclude trashed files — users see counts/sizes for what's actually in the library.
- active_only = LibraryFile.deleted_at.is_(None)
- # Total files
- total_files_result = await db.execute(select(func.count(LibraryFile.id)).where(active_only))
- total_files = total_files_result.scalar() or 0
- # Total folders
- total_folders_result = await db.execute(select(func.count(LibraryFolder.id)))
- total_folders = total_folders_result.scalar() or 0
- # Total size
- total_size_result = await db.execute(select(func.sum(LibraryFile.file_size)).where(active_only))
- total_size = total_size_result.scalar() or 0
- # Files by type
- type_result = await db.execute(
- select(LibraryFile.file_type, func.count(LibraryFile.id)).where(active_only).group_by(LibraryFile.file_type)
- )
- files_by_type = dict(type_result.all())
- # Total prints
- total_prints_result = await db.execute(select(func.sum(LibraryFile.print_count)).where(active_only))
- total_prints = total_prints_result.scalar() or 0
- # Disk space info
- library_dir = get_library_dir()
- try:
- disk_stat = shutil.disk_usage(library_dir)
- disk_free_bytes = disk_stat.free
- disk_total_bytes = disk_stat.total
- disk_used_bytes = disk_stat.used
- except OSError:
- disk_free_bytes = 0
- disk_total_bytes = 0
- disk_used_bytes = 0
- return {
- "total_files": total_files,
- "total_folders": total_folders,
- "total_size_bytes": total_size,
- "files_by_type": files_by_type,
- "total_prints": total_prints,
- "disk_free_bytes": disk_free_bytes,
- "disk_total_bytes": disk_total_bytes,
- "disk_used_bytes": disk_used_bytes,
- }
|