| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570 |
- import hashlib
- import json
- import logging
- import os
- import re
- import shutil
- import zipfile
- from datetime import date, datetime, time, timezone
- from pathlib import Path
- from defusedxml import ElementTree as ET
- from sqlalchemy import and_, or_, select, text
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.core.config import settings
- from backend.app.models.archive import PrintArchive
- from backend.app.models.filament import Filament
- from backend.app.models.printer import Printer
- logger = logging.getLogger(__name__)
- def _copy_and_fsync(src: Path, dst: Path, chunk_size: int = 1024 * 1024) -> None:
- """Copy src to dst with an explicit chunked read/write and fsync the dst.
- Replacement for shutil.copy2 in the archive pipeline. shutil.copy2 uses
- Linux sendfile(), which on some kernels/filesystems has returned a short
- count on the first call and truncated the destination for larger 3MF
- uploads (#1032, observed on Raspberry Pi OS bookworm / armv7l). An
- explicit loop with fsync avoids that path and guarantees the dest bytes
- are on disk before the caller inspects them as a ZIP.
- """
- with src.open("rb") as rf, dst.open("wb") as wf:
- while True:
- buf = rf.read(chunk_size)
- if not buf:
- break
- wf.write(buf)
- wf.flush()
- os.fsync(wf.fileno())
- shutil.copystat(src, dst)
- def resolve_display_stem(filename: str) -> str:
- """Return a clean human-readable stem from a 3MF/gcode filename.
- Bambu Studio's "Send to printer" dialog typically writes files like
- ``Plate_1.gcode.3mf`` (a sliced gcode payload wrapped in a 3MF container).
- The naive ``Path(filename).stem`` only drops the last suffix, leaving
- ``Plate_1.gcode`` — which then surfaces in the archive UI as a confusing
- ``Plate_1.gcode`` rather than ``Plate_1`` (#1152 follow-up).
- Strip the recognised print-format suffixes in order:
- - ``.gcode.3mf`` → bare stem (Bambu Studio FTP send)
- - ``.3mf`` → bare stem
- - ``.gcode`` → bare stem (rare standalone gcode upload)
- Anything else passes through unchanged.
- """
- name = Path(filename).name # drop any path components
- lower = name.lower()
- for suffix in (".gcode.3mf", ".3mf", ".gcode"):
- if lower.endswith(suffix):
- return name[: -len(suffix)]
- return Path(name).stem
- def peek_plate_index_in_3mf(file_path: Path) -> int | None:
- """Return the plate index recorded inside a Bambu 3MF, or None.
- Reads only ``Metadata/slice_info.config`` to keep this cheap — used by
- the print-start callback to verify that the 3MF we just downloaded over
- FTP actually matches the plate the printer is running (#1204). The full
- ThreeMFParser does much more work and runs later inside ArchiveService.
- """
- try:
- with zipfile.ZipFile(file_path, "r") as zf:
- if "Metadata/slice_info.config" not in zf.namelist():
- return None
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- plate = root.find(".//plate")
- if plate is None:
- return None
- for meta in plate.findall("metadata"):
- if meta.get("key") == "index":
- value = meta.get("value")
- if value:
- try:
- return int(value)
- except ValueError:
- return None
- except Exception:
- return None
- return None
- _PLATE_SUFFIX_RE = re.compile(r"^(.*?)(\s*-\s*Plate\s+|_plate_)(\d+)$", re.IGNORECASE)
- def swap_plate_suffix(name: str | None, target_plate: int) -> str | None:
- """Return ``name`` with its trailing plate number replaced, or None.
- Bambu Studio names multi-plate uploads ``"<Project> - Plate <N>"`` (and
- a lowercase ``"_plate_<N>"`` variant exists too — see
- test_print_start_expected_promotion). When MQTT subtask_name lags
- across consecutive plates of the same model (#1204) the suffix points
- at the previous plate; swapping it gives us the correct upload to
- re-fetch from FTP. Returns None if no recognised suffix is present.
- """
- if not name:
- return None
- m = _PLATE_SUFFIX_RE.match(name)
- if not m:
- return None
- base, separator, _ = m.groups()
- return f"{base}{separator}{target_plate}"
- class ThreeMFParser:
- """Parser for Bambu Lab 3MF files."""
- def __init__(self, file_path: Path, plate_number: int | None = None):
- self.file_path = file_path
- self.plate_number = plate_number # Which plate was printed (1, 2, 3, etc.)
- self.metadata: dict = {}
- def parse(self) -> dict:
- """Extract metadata from 3MF file."""
- try:
- with zipfile.ZipFile(self.file_path, "r") as zf:
- self._parse_slice_info(zf) # Now sets self.plate_number from slice_info
- self._parse_project_settings(zf)
- self._parse_gcode_header(zf)
- self._parse_3dmodel(zf)
- self._extract_thumbnail(zf) # Uses correct plate_number for thumbnail
- # Enhance print_name with plate info if this is a multi-plate export
- plate_index = self.metadata.get("_plate_index")
- if plate_index and plate_index > 1:
- # Append plate number to distinguish from other plates
- existing_name = self.metadata.get("print_name", "")
- if existing_name and f"Plate {plate_index}" not in existing_name:
- self.metadata["print_name"] = f"{existing_name} - Plate {plate_index}"
- # ALWAYS prefer slice_info values - they contain ONLY filaments actually used in print
- # project_settings contains ALL configured filaments (AMS slots), not just used ones
- if self.metadata.get("_slice_filament_type"):
- self.metadata["filament_type"] = self.metadata["_slice_filament_type"]
- if self.metadata.get("_slice_filament_color"):
- self.metadata["filament_color"] = self.metadata["_slice_filament_color"]
- # Clean up internal keys
- self.metadata.pop("_slice_filament_type", None)
- self.metadata.pop("_slice_filament_color", None)
- self.metadata.pop("_plate_index", None)
- except Exception as e:
- # Return whatever metadata was extracted before the error, but
- # surface the failure so corrupted / truncated 3MF archives are
- # visible in support bundles (#1032).
- logger.warning(
- "ThreeMFParser: failed to parse %s: %s(%s) — returning partial metadata",
- self.file_path,
- type(e).__name__,
- e,
- )
- return self.metadata
- def _parse_slice_info(self, zf: zipfile.ZipFile):
- """Parse slice_info.config for print settings and printable objects."""
- try:
- if "Metadata/slice_info.config" in zf.namelist():
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- # Extract printer_model_id from plate metadata
- # Format: <plate><metadata key="printer_model_id" value="C11" /></plate>
- for meta in root.findall(".//metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "printer_model_id" and value:
- from backend.app.utils.printer_models import normalize_printer_model_id
- normalized = normalize_printer_model_id(value)
- if normalized:
- self.metadata["sliced_for_model"] = normalized
- break
- # Find the plate element (single-plate exports only have one plate)
- plate = root.find(".//plate")
- if plate is not None:
- # Extract metadata from plate element
- for meta in plate.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "index" and value:
- # Extract plate index - this tells us which plate was exported
- try:
- extracted_index = int(value)
- # Set plate_number if not already set from filename
- if not self.plate_number:
- self.plate_number = extracted_index
- # Store in metadata for print_name generation
- self.metadata["_plate_index"] = extracted_index
- except ValueError:
- pass # Skip non-numeric plate index
- elif key == "prediction" and value:
- self.metadata["print_time_seconds"] = int(value)
- elif key == "weight" and value:
- self.metadata["filament_used_grams"] = float(value)
- elif key == "curr_bed_type" and value:
- self.metadata["bed_type"] = value
- # Extract printable objects for skip object functionality
- # Objects are stored as <object identify_id="123" name="Part1" skipped="false" />
- printable_objects = {}
- for obj in plate.findall("object"):
- identify_id = obj.get("identify_id")
- name = obj.get("name")
- skipped = obj.get("skipped", "false")
- # Only include objects that are not pre-skipped
- if identify_id and name and skipped.lower() != "true":
- try:
- printable_objects[int(identify_id)] = name
- except ValueError:
- pass # Skip objects with non-numeric identify_id
- if printable_objects:
- self.metadata["printable_objects"] = printable_objects
- # Get filament info from filaments ACTUALLY USED in the print
- # slice_info has <filament id="1" type="PLA" color="#FFFFFF" used_g="100" />
- # Only include filaments where used_g > 0
- filaments = root.findall(".//filament")
- if filaments:
- # Collect unique filament types and colors for filaments that are actually used
- types = []
- colors = []
- for f in filaments:
- # Check if this filament is actually used in the print
- used_g = f.get("used_g", "0")
- try:
- used_amount = float(used_g)
- except (ValueError, TypeError):
- used_amount = 0
- # Only include if used_g > 0 (filament is actually consumed)
- if used_amount > 0:
- ftype = f.get("type")
- fcolor = f.get("color")
- if ftype and ftype not in types:
- types.append(ftype)
- if fcolor and fcolor not in colors:
- colors.append(fcolor)
- if types:
- self.metadata["_slice_filament_type"] = ", ".join(types)
- if colors:
- self.metadata["_slice_filament_color"] = ",".join(colors)
- # Collect per-slot filament usage for tracking & notifications
- filament_slots = []
- for f in filaments:
- slot_id = f.get("id")
- used_g_str = f.get("used_g", "0")
- try:
- used_g = float(used_g_str)
- except (ValueError, TypeError):
- used_g = 0
- if used_g > 0 and slot_id:
- filament_slots.append(
- {
- "slot_id": int(slot_id),
- "used_g": round(used_g, 2),
- "type": f.get("type", ""),
- "color": f.get("color", ""),
- }
- )
- if filament_slots:
- self.metadata["filament_slots"] = filament_slots
- except Exception:
- pass # Skip unparseable slice_info metadata
- def _parse_project_settings(self, zf: zipfile.ZipFile):
- """Parse project settings for print configuration."""
- try:
- if "Metadata/project_settings.config" in zf.namelist():
- content = zf.read("Metadata/project_settings.config").decode()
- try:
- data = json.loads(content)
- self._extract_filament_info(data)
- self._extract_print_settings(data)
- except json.JSONDecodeError:
- pass # Skip malformed project_settings JSON
- except Exception:
- pass # Skip unreadable project settings file
- def _parse_gcode_header(self, zf: zipfile.ZipFile):
- """Parse G-code file header for total layer count and printer model."""
- try:
- # Look for plate_1.gcode or similar
- gcode_files = [f for f in zf.namelist() if f.endswith(".gcode")]
- if not gcode_files:
- return
- # Read first 4KB of G-code (header contains metadata)
- gcode_path = gcode_files[0]
- with zf.open(gcode_path) as f:
- header = f.read(4096).decode("utf-8", errors="ignore")
- # Look for "; total layer number: XX" pattern
- match = re.search(r";\s*total\s+layer\s+number[:\s]+(\d+)", header, re.IGNORECASE)
- if match:
- self.metadata["total_layers"] = int(match.group(1))
- # Total filament usage. The slicer writes the print's totals into
- # the G-code header ("; total filament weight [g] : 126.26"). Only
- # a fallback — slice_info.config is more authoritative when present
- # — but it covers sliced outputs whose slice_info lacks per-filament
- # used_g, and it's the slicer's own figure regardless.
- if "filament_used_grams" not in self.metadata:
- match = re.search(r";\s*total\s+filament\s+weight\s*\[g\]\s*:\s*([\d.]+)", header, re.IGNORECASE)
- if match:
- self.metadata["filament_used_grams"] = float(match.group(1))
- if "filament_used_mm" not in self.metadata:
- match = re.search(r";\s*total\s+filament\s+length\s*\[mm\]\s*:\s*([\d.]+)", header, re.IGNORECASE)
- if match:
- self.metadata["filament_used_mm"] = float(match.group(1))
- # Look for printer_model in gcode header (fallback if not found in slice_info)
- # Format: "; printer_model = Bambu Lab X1 Carbon" or "; printer_model = X1C"
- if "sliced_for_model" not in self.metadata:
- match = re.search(r";\s*printer_model\s*=\s*(.+)", header, re.IGNORECASE)
- if match:
- from backend.app.utils.printer_models import normalize_printer_model
- raw_model = match.group(1).strip()
- self.metadata["sliced_for_model"] = normalize_printer_model(raw_model)
- except Exception:
- pass # G-code header parsing is best-effort; metadata may come from other sources
- def _extract_filament_info(self, data: dict):
- """Extract filament info, preferring non-support filaments."""
- try:
- filament_types = data.get("filament_type", [])
- filament_colors = data.get("filament_colour", [])
- filament_is_support = data.get("filament_is_support", [])
- if not filament_types:
- return
- # Collect all non-support filaments
- non_support_types = []
- non_support_colors = []
- for i, ftype in enumerate(filament_types):
- is_support = filament_is_support[i] if i < len(filament_is_support) else "0"
- if is_support == "0":
- if ftype and ftype not in non_support_types:
- non_support_types.append(ftype)
- if i < len(filament_colors) and filament_colors[i]:
- color = filament_colors[i]
- if color not in non_support_colors:
- non_support_colors.append(color)
- # Fallback to first filament if all are support
- if not non_support_types and filament_types:
- non_support_types = [filament_types[0]]
- if not non_support_colors and filament_colors:
- non_support_colors = [filament_colors[0]]
- # Store filament type(s)
- if non_support_types:
- self.metadata["filament_type"] = ", ".join(non_support_types)
- # Store all colors as comma-separated (for multi-color display)
- if non_support_colors:
- self.metadata["filament_color"] = ",".join(non_support_colors)
- except Exception:
- pass # Filament info is optional; fall back to slice_info values
- def _extract_print_settings(self, data: dict):
- """Extract print settings from JSON config."""
- try:
- # Layer height - usually an array, get first value
- if "layer_height" in data:
- val = data["layer_height"]
- if isinstance(val, list) and val:
- self.metadata["layer_height"] = float(val[0])
- elif isinstance(val, (int, float, str)):
- self.metadata["layer_height"] = float(val)
- # Nozzle diameter
- if "nozzle_diameter" in data:
- val = data["nozzle_diameter"]
- if isinstance(val, list) and val:
- self.metadata["nozzle_diameter"] = float(val[0])
- elif isinstance(val, (int, float, str)):
- self.metadata["nozzle_diameter"] = float(val)
- # Bed temperature - first layer or regular
- for key in ["bed_temperature_initial_layer", "bed_temperature"]:
- if key in data:
- val = data[key]
- if isinstance(val, list) and val:
- self.metadata["bed_temperature"] = int(float(val[0]))
- elif isinstance(val, (int, float, str)):
- self.metadata["bed_temperature"] = int(float(val))
- break
- # Nozzle temperature
- for key in ["nozzle_temperature_initial_layer", "nozzle_temperature"]:
- if key in data:
- val = data[key]
- if isinstance(val, list) and val:
- self.metadata["nozzle_temperature"] = int(float(val[0]))
- elif isinstance(val, (int, float, str)):
- self.metadata["nozzle_temperature"] = int(float(val))
- break
- # Printer model (extract and normalize)
- if "printer_model" in data:
- from backend.app.utils.printer_models import normalize_printer_model
- self.metadata["sliced_for_model"] = normalize_printer_model(data["printer_model"])
- # Build plate type — only set from project_settings if slice_info didn't already
- # provide it (slice_info is more authoritative as it reflects the exported plate).
- if "bed_type" not in self.metadata and "curr_bed_type" in data:
- val = data["curr_bed_type"]
- if isinstance(val, str) and val.strip():
- self.metadata["bed_type"] = val.strip()
- except Exception:
- pass # Print settings are optional; missing values are left unset
- def _extract_settings_from_content(self, content: str):
- """Extract print settings from config content."""
- settings_map = {
- "layer_height": ("layer_height", float),
- "nozzle_diameter": ("nozzle_diameter", float),
- "bed_temperature": ("bed_temperature", int),
- "nozzle_temperature": ("nozzle_temperature", int),
- }
- for key, (search_key, converter) in settings_map.items():
- if key not in self.metadata:
- try:
- # Try JSON format
- if f'"{search_key}"' in content:
- start = content.find(f'"{search_key}"')
- value_start = content.find(":", start) + 1
- value_end = content.find(",", value_start)
- if value_end == -1:
- value_end = content.find("}", value_start)
- value = content[value_start:value_end].strip().strip('"')
- self.metadata[key] = converter(value)
- except (ValueError, TypeError):
- pass # Skip settings with unconvertible values
- def _parse_3dmodel(self, zf: zipfile.ZipFile):
- """Parse 3D/3dmodel.model for MakerWorld metadata."""
- try:
- model_path = "3D/3dmodel.model"
- if model_path not in zf.namelist():
- return
- content = zf.read(model_path).decode("utf-8", errors="ignore")
- # Parse XML metadata elements
- # MakerWorld adds metadata like: <metadata name="Designer">username</metadata>
- metadata_pattern = r'<metadata\s+name="([^"]+)"[^>]*>([^<]*)</metadata>'
- matches = re.findall(metadata_pattern, content)
- makerworld_fields = {}
- for name, value in matches:
- makerworld_fields[name] = value.strip()
- # Check for direct MakerWorld URL in content
- url_pattern = r'https?://makerworld\.com/[^\s<>"\']+/models/(\d+)'
- url_match = re.search(url_pattern, content)
- if url_match:
- self.metadata["makerworld_url"] = url_match.group(0)
- self.metadata["makerworld_model_id"] = url_match.group(1)
- # Extract model ID from DSM reference in image URLs
- # Format: https://makerworld.bblmw.com/makerworld/model/DSM00000001275614/...
- # The numeric part (1275614) is the MakerWorld model ID
- if "makerworld_url" not in self.metadata:
- dsm_pattern = r"DSM0+(\d+)"
- dsm_match = re.search(dsm_pattern, content)
- if dsm_match:
- model_id = dsm_match.group(1)
- self.metadata["makerworld_url"] = f"https://makerworld.com/en/models/{model_id}"
- self.metadata["makerworld_model_id"] = model_id
- # Store designer info
- if "Designer" in makerworld_fields:
- self.metadata["designer"] = makerworld_fields["Designer"]
- if "Title" in makerworld_fields:
- self.metadata["print_name"] = makerworld_fields["Title"]
- except Exception:
- pass # MakerWorld/3dmodel metadata is optional
- def _extract_thumbnail(self, zf: zipfile.ZipFile):
- """Extract thumbnail image from 3MF.
- If a plate_number was specified, try to use that plate's thumbnail first.
- """
- thumbnail_paths = []
- # If a specific plate was printed, try that thumbnail first
- if self.plate_number:
- thumbnail_paths.append(f"Metadata/plate_{self.plate_number}.png")
- # Fallback to default paths
- thumbnail_paths.extend(
- [
- "Metadata/plate_1.png",
- "Metadata/thumbnail.png",
- "Metadata/model_thumbnail.png",
- # Project-wide thumbnail BambuStudio embeds at upload time. We
- # only reach this when BS hasn't written a per-plate
- # ``Metadata/plate_N.png`` — most notably the #1493 cross-class
- # re-slice path where ``--arrange`` rearranges objects but the
- # CLI then doesn't emit a fresh per-plate preview. The
- # ``_middle`` size is the editor-quality variant (~500 KB);
- # ``_small`` and ``_3mf`` are smaller alternates if it's not
- # present. Without this fallback the re-sliced archive cards
- # render without a cover image.
- "Auxiliaries/.thumbnails/thumbnail_middle.png",
- "Auxiliaries/.thumbnails/thumbnail_small.png",
- "Auxiliaries/.thumbnails/thumbnail_3mf.png",
- ]
- )
- for thumb_path in thumbnail_paths:
- if thumb_path in zf.namelist():
- self.metadata["_thumbnail_data"] = zf.read(thumb_path)
- self.metadata["_thumbnail_ext"] = ".png"
- break
- def extract_printable_objects_from_3mf(
- data: bytes, plate_number: int | None = None, include_positions: bool = False
- ) -> dict[int, str] | dict[int, dict] | tuple[dict[int, dict], list | None]:
- """Extract printable objects from 3MF file bytes.
- This is a lightweight function used during print start to get the list
- of objects that can be skipped.
- Args:
- data: Raw bytes of the 3MF file
- plate_number: Which plate was printed (1-based), or None for first plate
- include_positions: If True, return tuple of (objects dict, bbox_all)
- Returns:
- If include_positions=False: Dictionary mapping identify_id (int) to object name (str)
- If include_positions=True: Tuple of (dict mapping identify_id to {name, x, y}, bbox_all list or None)
- """
- from io import BytesIO
- printable_objects: dict = {}
- bbox_all: list | None = None
- try:
- with zipfile.ZipFile(BytesIO(data), "r") as zf:
- if "Metadata/slice_info.config" not in zf.namelist():
- return printable_objects
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- # Find the correct plate
- if plate_number:
- plate = root.find(f".//plate[@plate_idx='{plate_number}']")
- if plate is None:
- plate = root.find(".//plate")
- else:
- plate = root.find(".//plate")
- if plate is None:
- return printable_objects
- # Get actual plate index from metadata (sliced files only have one plate)
- plate_idx = plate_number or 1
- for meta in plate.findall("metadata"):
- if meta.get("key") == "index":
- try:
- plate_idx = int(meta.get("value", "1"))
- except ValueError:
- pass # Use default plate_idx if value is non-numeric
- break
- # Load position data from plate_N.json if we need positions
- # Build a lookup by name - use list to handle duplicate names
- bbox_by_name: dict[str, list[list]] = {}
- if include_positions:
- plate_json_path = f"Metadata/plate_{plate_idx}.json"
- if plate_json_path in zf.namelist():
- try:
- plate_json = json.loads(zf.read(plate_json_path).decode())
- # Get bbox_all - the bounding box of all objects (used for image bounds)
- bbox_all = plate_json.get("bbox_all")
- for bbox_obj in plate_json.get("bbox_objects", []):
- obj_name = bbox_obj.get("name")
- bbox = bbox_obj.get("bbox", [])
- if obj_name and len(bbox) >= 4:
- if obj_name not in bbox_by_name:
- bbox_by_name[obj_name] = []
- bbox_by_name[obj_name].append(bbox)
- except (json.JSONDecodeError, KeyError):
- pass # Position data is optional; objects will lack x/y coordinates
- # Extract objects from slice_info.config
- for obj in plate.findall("object"):
- identify_id = obj.get("identify_id")
- name = obj.get("name")
- skipped = obj.get("skipped", "false")
- if identify_id and name and skipped.lower() != "true":
- try:
- obj_id = int(identify_id)
- if include_positions:
- x, y = None, None
- # Match by name - pop first bbox to handle duplicates
- bboxes = bbox_by_name.get(name)
- if bboxes:
- bbox = bboxes.pop(0)
- # Calculate center from bbox [x_min, y_min, x_max, y_max]
- x = (bbox[0] + bbox[2]) / 2
- y = (bbox[1] + bbox[3]) / 2
- printable_objects[obj_id] = {"name": name, "x": x, "y": y}
- else:
- printable_objects[obj_id] = name
- except ValueError:
- pass # Skip objects with non-numeric identify_id
- except Exception:
- pass # Return empty dict if 3MF is corrupt or unreadable
- if include_positions:
- return printable_objects, bbox_all
- return printable_objects
- class ProjectPageParser:
- """Parser for extracting project page data from Bambu Lab 3MF files."""
- def __init__(self, file_path: Path):
- self.file_path = file_path
- def parse(self, archive_id: int) -> dict:
- """Extract project page metadata and images from 3MF file."""
- import html
- result = {
- "title": None,
- "description": None,
- "designer": None,
- "designer_user_id": None,
- "license": None,
- "copyright": None,
- "creation_date": None,
- "modification_date": None,
- "origin": None,
- "profile_title": None,
- "profile_description": None,
- "profile_cover": None,
- "profile_user_id": None,
- "profile_user_name": None,
- "design_model_id": None,
- "design_profile_id": None,
- "design_region": None,
- "model_pictures": [],
- "profile_pictures": [],
- "thumbnails": [],
- }
- try:
- with zipfile.ZipFile(self.file_path, "r") as zf:
- # Parse 3D/3dmodel.model for metadata
- model_path = "3D/3dmodel.model"
- if model_path in zf.namelist():
- content = zf.read(model_path).decode("utf-8", errors="ignore")
- # Extract metadata elements using regex
- # Format: <metadata name="Key">Value</metadata> or <metadata name="Key" />
- metadata_pattern = r'<metadata\s+name="([^"]+)"[^>]*>([^<]*)</metadata>'
- matches = re.findall(metadata_pattern, content)
- field_mapping = {
- "Title": "title",
- "Description": "description",
- "Designer": "designer",
- "DesignerUserId": "designer_user_id",
- "License": "license",
- "Copyright": "copyright",
- "CreationDate": "creation_date",
- "ModificationDate": "modification_date",
- "Origin": "origin",
- "ProfileTitle": "profile_title",
- "ProfileDescription": "profile_description",
- "ProfileCover": "profile_cover",
- "ProfileUserId": "profile_user_id",
- "ProfileUserName": "profile_user_name",
- "DesignModelId": "design_model_id",
- "DesignProfileId": "design_profile_id",
- "DesignRegion": "design_region",
- }
- for name, value in matches:
- if name in field_mapping:
- # Decode HTML entities multiple times (content is often triple-encoded)
- decoded = value.strip()
- prev = None
- while prev != decoded:
- prev = decoded
- decoded = html.unescape(decoded)
- # Normalize non-breaking spaces to regular spaces
- decoded = decoded.replace("\xa0", " ")
- result[field_mapping[name]] = decoded if decoded else None
- # List images in Auxiliaries folder
- from urllib.parse import quote
- for name in zf.namelist():
- if name.startswith("Auxiliaries/Model Pictures/"):
- filename = name.split("/")[-1]
- if filename:
- result["model_pictures"].append(
- {
- "name": filename,
- "path": name,
- "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}",
- }
- )
- elif name.startswith("Auxiliaries/Profile Pictures/"):
- filename = name.split("/")[-1]
- if filename:
- result["profile_pictures"].append(
- {
- "name": filename,
- "path": name,
- "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}",
- }
- )
- elif name.startswith("Auxiliaries/.thumbnails/"):
- filename = name.split("/")[-1]
- if filename:
- result["thumbnails"].append(
- {
- "name": filename,
- "path": name,
- "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}",
- }
- )
- except Exception as e:
- result["_error"] = str(e)
- return result
- def get_image(self, image_path: str) -> tuple[bytes, str] | None:
- """Extract an image from the 3MF file.
- Returns tuple of (image_data, content_type) or None if not found.
- """
- try:
- with zipfile.ZipFile(self.file_path, "r") as zf:
- if image_path in zf.namelist():
- data = zf.read(image_path)
- # Determine content type from extension
- ext = image_path.lower().split(".")[-1]
- content_types = {
- "png": "image/png",
- "jpg": "image/jpeg",
- "jpeg": "image/jpeg",
- "webp": "image/webp",
- "gif": "image/gif",
- }
- content_type = content_types.get(ext, "application/octet-stream")
- return (data, content_type)
- except Exception:
- pass # Return None if image cannot be extracted from 3MF
- return None
- def update_metadata(self, updates: dict) -> bool:
- """Update project page metadata in the 3MF file.
- Args:
- updates: Dict with fields to update (title, description, designer, etc.)
- Returns:
- True if successful, False otherwise.
- """
- import html
- import tempfile
- try:
- # Read the 3MF file
- with zipfile.ZipFile(self.file_path, "r") as zf_read:
- # Find and read the 3dmodel.model file
- model_path = "3D/3dmodel.model"
- if model_path not in zf_read.namelist():
- return False
- content = zf_read.read(model_path).decode("utf-8")
- # Update metadata fields
- field_mapping = {
- "title": "Title",
- "description": "Description",
- "designer": "Designer",
- "license": "License",
- "copyright": "Copyright",
- "profile_title": "ProfileTitle",
- "profile_description": "ProfileDescription",
- }
- for field, xml_name in field_mapping.items():
- if field in updates and updates[field] is not None:
- new_value = html.escape(updates[field])
- # Replace existing metadata or we'd need to add it
- pattern = rf'(<metadata\s+name="{xml_name}"[^>]*>)[^<]*(</metadata>)'
- replacement = rf"\g<1>{new_value}\g<2>"
- content = re.sub(pattern, replacement, content)
- # Write to a temporary file first
- with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf") as tmp:
- tmp_path = Path(tmp.name)
- # Create new zip with updated content
- with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as zf_write:
- for item in zf_read.namelist():
- if item == model_path:
- zf_write.writestr(item, content.encode("utf-8"))
- else:
- zf_write.writestr(item, zf_read.read(item))
- # Replace original file with updated one
- shutil.move(tmp_path, self.file_path)
- return True
- except Exception:
- # Clean up temp file if it exists
- if "tmp_path" in locals() and tmp_path.exists():
- tmp_path.unlink()
- return False
- async def _null_print_log_thumbnail_paths(db: AsyncSession, archive_id: int) -> None:
- """NULL thumbnail_path on PrintLogEntry rows linked to *archive_id*.
- Called from both soft- and hard-delete paths before the archive's files
- leave disk. The FK on PrintLogEntry.archive_id is ON DELETE SET NULL so
- log rows survive the archive — without this clear, their cached
- thumbnail_path would still point at a deleted file and the print-log
- view would 404-storm on every render (#1348 follow-up). Lazy-NULL on
- the GET route self-heals stragglers (e.g. failed prints that never had
- a thumbnail written), but eager clear here avoids the one-time storm.
- """
- from sqlalchemy import update as sa_update
- from backend.app.models.print_log import PrintLogEntry
- await db.execute(sa_update(PrintLogEntry).where(PrintLogEntry.archive_id == archive_id).values(thumbnail_path=None))
- async def _cancel_pending_queue_items(db: AsyncSession, archive_id: int) -> None:
- """Cancel pending queue items pointing at *archive_id* (#1348 follow-up).
- Called from ``soft_delete_archive`` only — hard-delete is covered by the
- ``ON DELETE CASCADE`` on ``print_queue.archive_id``. A queue item
- pointing at an archive whose 3MF has been removed from disk can never
- actually dispatch, so cancelling at delete time both (a) tells the user
- why the item disappeared from the pending list, and (b) stops the queue
- page from 404-storming the archive thumbnail / plates / plate-thumbnail
- endpoints when the row is rendered. Only ``pending`` items are touched;
- ``printing`` is a rare race the printer-side fail-path catches, and
- completed / failed / cancelled rows are historical and untouched.
- """
- from sqlalchemy import update as sa_update
- from backend.app.models.print_queue import PrintQueueItem
- await db.execute(
- sa_update(PrintQueueItem)
- .where(PrintQueueItem.archive_id == archive_id, PrintQueueItem.status == "pending")
- .values(status="cancelled", waiting_reason="Source archive deleted")
- )
- class ArchiveService:
- """Service for archiving print jobs."""
- def __init__(self, db: AsyncSession):
- self.db = db
- @staticmethod
- def compute_file_hash(file_path: Path) -> str:
- """Compute SHA256 hash of a file for duplicate detection."""
- sha256 = hashlib.sha256()
- with open(file_path, "rb") as f:
- # Read in chunks to handle large files
- for chunk in iter(lambda: f.read(8192), b""):
- sha256.update(chunk)
- return sha256.hexdigest()
- async def get_duplicate_hashes_and_names(self) -> tuple[set[str], set[tuple[str, str]]]:
- """Get all content hashes and (print name, hash) pairs that appear more than once.
- For hashes: returns all hashes with > 1 archive (true duplicates).
- For name/hash pairs: returns only pairs that have > 1 archive
- (i.e., same file archived multiple times, not different files with same name).
- Returns a tuple of (duplicate_hashes, duplicate_name_hash_pairs).
- """
- from sqlalchemy import func
- # Soft-deleted archives don't appear in the listing (#1343), so they
- # mustn't influence the duplicate-group counts either — otherwise a
- # group with 1 live + 4 soft-deleted would still be flagged as a
- # duplicate even though the user only sees one row.
- result = await self.db.execute(
- select(PrintArchive.content_hash)
- .where(PrintArchive.content_hash.isnot(None), PrintArchive.deleted_at.is_(None))
- .group_by(PrintArchive.content_hash)
- .having(func.count(PrintArchive.id) > 1)
- )
- duplicate_hashes = {row[0] for row in result.all()}
- # Find print names that have multiple archives with the SAME hash
- # This avoids marking different files with the same name as duplicates
- result = await self.db.execute(
- select(func.lower(PrintArchive.print_name), PrintArchive.content_hash)
- .where(
- PrintArchive.print_name.isnot(None),
- PrintArchive.content_hash.isnot(None),
- PrintArchive.deleted_at.is_(None),
- )
- .group_by(func.lower(PrintArchive.print_name), PrintArchive.content_hash)
- .having(func.count(PrintArchive.id) > 1)
- )
- duplicate_name_hash_pairs = {(row[0], row[1]) for row in result.all()}
- return duplicate_hashes, duplicate_name_hash_pairs
- async def find_duplicates(
- self,
- archive_id: int,
- content_hash: str | None = None,
- print_name: str | None = None,
- makerworld_model_id: str | None = None,
- ) -> list[dict]:
- """Find duplicate archives based on hash or name matching.
- Returns list of dicts with id, print_name, created_at, match_type.
- """
- duplicates = []
- # First, find exact matches by content hash
- if content_hash:
- result = await self.db.execute(
- select(PrintArchive)
- .where(
- and_(
- PrintArchive.content_hash == content_hash,
- PrintArchive.id != archive_id,
- PrintArchive.deleted_at.is_(None),
- )
- )
- .order_by(PrintArchive.created_at.desc())
- .limit(10)
- )
- for archive in result.scalars().all():
- duplicates.append(
- {
- "id": archive.id,
- "print_name": archive.print_name,
- "created_at": archive.created_at,
- "match_type": "exact",
- }
- )
- # Then, find similar matches by print name or MakerWorld ID
- # Prefer strict name+hash matching when hash exists; fallback to name-only for legacy/manual
- # archives that may not have a content_hash.
- if print_name or makerworld_model_id:
- conditions = [PrintArchive.id != archive_id, PrintArchive.deleted_at.is_(None)]
- name_conditions = []
- if print_name:
- if content_hash:
- # Match if print names are similar AND have the same hash (same file)
- name_conditions.append(
- and_(PrintArchive.print_name.ilike(print_name), PrintArchive.content_hash == content_hash)
- )
- else:
- # Fallback for archives without hash data: match by print name only.
- name_conditions.append(PrintArchive.print_name.ilike(print_name))
- if makerworld_model_id:
- # Match by MakerWorld model ID stored in extra_data
- from backend.app.core.db_dialect import is_sqlite
- if is_sqlite():
- from sqlalchemy import func
- name_conditions.append(
- func.json_extract(PrintArchive.extra_data, "$.makerworld_model_id") == str(makerworld_model_id)
- )
- else:
- name_conditions.append(
- text("(extra_data::jsonb->>'makerworld_model_id') = :mw_id").bindparams(
- mw_id=str(makerworld_model_id)
- )
- )
- if name_conditions:
- conditions.append(or_(*name_conditions))
- result = await self.db.execute(
- select(PrintArchive).where(and_(*conditions)).order_by(PrintArchive.created_at.desc()).limit(10)
- )
- for archive in result.scalars().all():
- # Don't add if already in duplicates (exact match)
- if not any(d["id"] == archive.id for d in duplicates):
- duplicates.append(
- {
- "id": archive.id,
- "print_name": archive.print_name,
- "created_at": archive.created_at,
- "match_type": "similar",
- }
- )
- return duplicates
- async def archive_print(
- self,
- printer_id: int | None,
- source_file: Path,
- print_data: dict | None = None,
- created_by_id: int | None = None,
- original_filename: str | None = None,
- project_id: int | None = None,
- subtask_id: str | None = None,
- prefer_filename_for_name: bool = False,
- ) -> PrintArchive | None:
- """Archive a 3MF file with metadata.
- Args:
- printer_id: ID of the printer (optional)
- source_file: Path to the 3MF file
- print_data: Print data from MQTT (optional)
- created_by_id: User ID who created this archive (optional, for user tracking)
- original_filename: Original human-readable filename (optional, for library files
- stored with UUID names)
- project_id: Project to associate this archive with (optional, set when triggered
- from the project view)
- subtask_id: MQTT-provided task identifier (optional). Used to match an
- existing archive across a backend restart mid-print so the
- original row can be resumed instead of cancelled (#972).
- prefer_filename_for_name: When True, use the uploaded filename stem as the
- archive's display name even if the 3MF embeds a `print_name` in its
- metadata. Used by virtual-printer flows so users who rename a job in
- BambuStudio's "send to printer" dialog see that name instead of the
- creator-baked title (#1152).
- """
- # Verify printer exists if specified
- if printer_id is not None:
- result = await self.db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- return None
- # Create archive directory structure
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- display_stem = resolve_display_stem(original_filename if original_filename else source_file.name)
- archive_name = f"{timestamp}_{display_stem}"
- # Use "unassigned" folder for archives without a printer
- printer_folder = str(printer_id) if printer_id is not None else "unassigned"
- archive_dir = settings.archive_dir / printer_folder / archive_name
- archive_dir.mkdir(parents=True, exist_ok=True)
- # Copy 3MF file with an explicit fsync'd loop (avoids a sendfile
- # short-read quirk that silently truncated 3MF archives on some
- # platforms — see _copy_and_fsync and #1032).
- dest_file = archive_dir / source_file.name
- _copy_and_fsync(source_file, dest_file)
- # If we just archived a 3MF, verify the dest is a valid ZIP before
- # going any further. Staying quiet here is how #1032 escaped review —
- # the archive row was written but every later zipfile.ZipFile() call
- # on the dest failed with "File is not a zip file".
- if (
- source_file.suffix.lower() == ".3mf"
- and zipfile.is_zipfile(source_file)
- and not zipfile.is_zipfile(dest_file)
- ):
- try:
- src_size = source_file.stat().st_size
- dst_size = dest_file.stat().st_size
- except OSError:
- src_size = dst_size = -1
- logger.error(
- "Archive copy corrupted 3MF: src=%s (%s bytes, valid ZIP) -> dst=%s (%s bytes, NOT a ZIP). Refusing to create archive row.",
- source_file,
- src_size,
- dest_file,
- dst_size,
- )
- # Narrow cleanup: remove only the truncated file and the archive
- # directory if it's now empty. archive_dir was created with
- # exist_ok=True so it could in theory pre-date this call (e.g.
- # same-second same-filename collision); rmtree would be too broad.
- try:
- dest_file.unlink()
- except OSError:
- pass
- try:
- archive_dir.rmdir()
- except OSError:
- pass # directory not empty — leave untouched
- return None
- # Compute content hash for duplicate detection
- content_hash = self.compute_file_hash(dest_file)
- # Extract plate number from filename (e.g., "plate_5" from "/data/Metadata/plate_5.gcode")
- plate_number = None
- if print_data:
- filename = print_data.get("filename", "")
- match = re.search(r"plate_(\d+)", filename)
- if match:
- plate_number = int(match.group(1))
- # Parse 3MF metadata
- parser = ThreeMFParser(dest_file, plate_number=plate_number)
- metadata = parser.parse()
- # Save thumbnail if present
- thumbnail_path = None
- if "_thumbnail_data" in metadata:
- thumb_file = archive_dir / f"thumbnail{metadata['_thumbnail_ext']}"
- thumb_file.write_bytes(metadata["_thumbnail_data"])
- thumbnail_path = str(thumb_file.relative_to(settings.base_dir))
- del metadata["_thumbnail_data"]
- del metadata["_thumbnail_ext"]
- # Merge with print data from MQTT
- if print_data:
- metadata["_print_data"] = print_data
- # Determine status and timestamps
- status = print_data.get("status", "completed") if print_data else "archived"
- started_at = datetime.now(timezone.utc) if status == "printing" else None
- completed_at = datetime.now(timezone.utc) if status in ("completed", "failed", "archived") else None
- # Calculate cost based on filament usage and type
- cost = None
- filament_grams = metadata.get("filament_used_grams")
- filament_type = metadata.get("filament_type")
- if filament_grams and filament_type:
- # For multi-material prints, use the first filament type for cost calculation
- primary_type = filament_type.split(",")[0].strip()
- # Look up filament cost_per_kg from database
- filament_result = await self.db.execute(select(Filament).where(Filament.type == primary_type).limit(1))
- filament = filament_result.scalar_one_or_none()
- if filament:
- cost = round((filament_grams / 1000) * filament.cost_per_kg, 2)
- else:
- # Use default filament cost from settings
- from backend.app.api.routes.settings import get_setting
- default_cost_setting = await get_setting(self.db, "default_filament_cost")
- default_cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0
- cost = round((filament_grams / 1000) * default_cost_per_kg, 2)
- # Calculate quantity from printable objects count
- # printable_objects is a dict of {identify_id: name} for non-skipped objects
- quantity = 1 # Default to 1
- printable_objects = metadata.get("printable_objects")
- if printable_objects and isinstance(printable_objects, dict):
- quantity = len(printable_objects)
- logger.debug("Auto-detected %s parts from 3MF printable objects", quantity)
- # Create archive record
- archive = PrintArchive(
- printer_id=printer_id,
- filename=original_filename or source_file.name,
- file_path=str(dest_file.relative_to(settings.base_dir)),
- file_size=dest_file.stat().st_size,
- content_hash=content_hash,
- thumbnail_path=thumbnail_path,
- print_name=display_stem if prefer_filename_for_name else (metadata.get("print_name") or display_stem),
- print_time_seconds=metadata.get("print_time_seconds"),
- filament_used_grams=metadata.get("filament_used_grams"),
- filament_type=metadata.get("filament_type"),
- filament_color=metadata.get("filament_color"),
- layer_height=metadata.get("layer_height"),
- total_layers=metadata.get("total_layers"),
- nozzle_diameter=metadata.get("nozzle_diameter"),
- bed_temperature=metadata.get("bed_temperature"),
- bed_type=metadata.get("bed_type"),
- nozzle_temperature=metadata.get("nozzle_temperature"),
- sliced_for_model=metadata.get("sliced_for_model"),
- makerworld_url=metadata.get("makerworld_url"),
- designer=metadata.get("designer"),
- status=status,
- started_at=started_at,
- completed_at=completed_at,
- cost=cost,
- quantity=quantity,
- extra_data=metadata,
- created_by_id=created_by_id,
- project_id=project_id,
- subtask_id=subtask_id,
- )
- self.db.add(archive)
- await self.db.commit()
- await self.db.refresh(archive)
- return archive
- async def get_archive(self, archive_id: int) -> PrintArchive | None:
- """Get an archive by ID with relationships loaded."""
- from sqlalchemy.orm import selectinload
- result = await self.db.execute(
- select(PrintArchive)
- .options(selectinload(PrintArchive.created_by), selectinload(PrintArchive.project))
- .where(PrintArchive.id == archive_id)
- )
- return result.scalar_one_or_none()
- async def update_archive_status(
- self,
- archive_id: int,
- status: str,
- completed_at: datetime | None = None,
- failure_reason: str | None = None,
- ) -> bool:
- """Update the status of an archive."""
- archive = await self.get_archive(archive_id)
- if not archive:
- return False
- archive.status = status
- if completed_at:
- archive.completed_at = completed_at
- if failure_reason:
- archive.failure_reason = failure_reason
- await self.db.commit()
- return True
- async def list_archives(
- self,
- printer_id: int | None = None,
- project_id: int | None = None,
- date_from: date | None = None,
- date_to: date | None = None,
- limit: int = 50,
- offset: int = 0,
- ) -> list[PrintArchive]:
- """List archives with optional filtering."""
- from sqlalchemy.orm import selectinload
- query = (
- select(PrintArchive)
- .options(selectinload(PrintArchive.project), selectinload(PrintArchive.created_by))
- # Hide soft-deleted rows from the listings (#1343). The stats
- # endpoint deliberately does NOT add this filter so deleted
- # archives keep contributing to Quick Stats.
- .where(PrintArchive.deleted_at.is_(None))
- .order_by(PrintArchive.created_at.desc())
- )
- if printer_id:
- query = query.where(PrintArchive.printer_id == printer_id)
- if project_id:
- query = query.where(PrintArchive.project_id == project_id)
- if date_from:
- dt_from = datetime.combine(date_from, time.min, tzinfo=timezone.utc)
- query = query.where(PrintArchive.created_at >= dt_from)
- if date_to:
- dt_to = datetime.combine(date_to, time.max, tzinfo=timezone.utc)
- query = query.where(PrintArchive.created_at <= dt_to)
- query = query.limit(limit).offset(offset)
- result = await self.db.execute(query)
- return list(result.scalars().all())
- async def soft_delete_archive(self, archive_id: int) -> bool:
- """Soft-delete an archive (#1343).
- Removes the archive's files from disk (it disappears from the listings
- and frees the storage) but flips the row's ``deleted_at`` so the stats
- endpoint keeps counting its filament / energy / time / cost. The user
- can opt into a hard delete via the "Also remove from statistics"
- checkbox in the delete dialog — that path calls ``delete_archive``
- instead and removes the row entirely.
- """
- archive = await self.get_archive(archive_id)
- if not archive:
- return False
- if archive.deleted_at is not None:
- # Already soft-deleted; nothing to do. The files were purged on
- # the first soft-delete pass so there is nothing left on disk.
- return True
- dir_to_delete = self._resolve_archive_dir_for_delete(archive)
- await _null_print_log_thumbnail_paths(self.db, archive_id)
- await _cancel_pending_queue_items(self.db, archive_id)
- archive.deleted_at = datetime.now(timezone.utc)
- await self.db.commit()
- if dir_to_delete:
- shutil.rmtree(dir_to_delete, ignore_errors=True)
- return True
- def _resolve_archive_dir_for_delete(self, archive: PrintArchive) -> Path | None:
- """Return the on-disk directory that backs *archive*, after the same
- two safety checks ``delete_archive`` enforces.
- Extracted so soft-delete and hard-delete share the path-resolution
- rules. Returns ``None`` when nothing should be removed from disk
- (no file_path, path outside archive_dir, or path not deep enough).
- """
- if not archive.file_path or not archive.file_path.strip():
- logger.error(
- f"SECURITY: Refusing to delete files for archive {archive.id} - "
- f"file_path is empty or invalid: '{archive.file_path}'"
- )
- return None
- file_path = settings.base_dir / archive.file_path
- if not file_path.exists():
- return None
- archive_dir = file_path.parent
- try:
- relative_path = archive_dir.resolve().relative_to(settings.archive_dir.resolve())
- except ValueError:
- logger.error(
- f"SECURITY: Refusing to delete archive {archive.id} - "
- f"path {archive_dir} is outside archive directory {settings.archive_dir}"
- )
- return None
- if len(relative_path.parts) < 1:
- logger.error(
- f"SECURITY: Refusing to delete archive {archive.id} - "
- f"path {archive_dir} is not deep enough inside archive directory"
- )
- return None
- return archive_dir
- async def delete_archive(self, archive_id: int) -> bool:
- """Delete an archive and its files."""
- archive = await self.get_archive(archive_id)
- if not archive:
- return False
- # Resolve the directory to delete BEFORE committing the DB change
- dir_to_delete: Path | None = None
- if archive.file_path and archive.file_path.strip():
- file_path = settings.base_dir / archive.file_path
- if file_path.exists():
- archive_dir = file_path.parent
- # Safety check 1: archive_dir must be inside archive_dir
- try:
- archive_dir.resolve().relative_to(settings.archive_dir.resolve())
- except ValueError:
- logger.error(
- f"SECURITY: Refusing to delete archive {archive_id} - "
- f"path {archive_dir} is outside archive directory {settings.archive_dir}"
- )
- await self.db.delete(archive)
- await self.db.commit()
- return True
- # Safety check 2: archive_dir must be at least 1 level deep inside archive_dir
- try:
- relative_path = archive_dir.resolve().relative_to(settings.archive_dir.resolve())
- if len(relative_path.parts) < 1:
- logger.error(
- f"SECURITY: Refusing to delete archive {archive_id} - "
- f"path {archive_dir} is not deep enough inside archive directory"
- )
- await self.db.delete(archive)
- await self.db.commit()
- return True
- except ValueError:
- pass # Already handled above
- dir_to_delete = archive_dir
- else:
- logger.error(
- f"SECURITY: Refusing to delete files for archive {archive_id} - "
- f"file_path is empty or invalid: '{archive.file_path}'"
- )
- # NULL stale thumbnail_path on linked PrintLogEntries before the FK
- # SET-NULL cascade fires. The on-disk file is about to be removed by
- # the rmtree below, so the path on any surviving log entry (archive_id
- # gets SET NULL by the FK) would otherwise point at a missing file
- # and produce 404 storms in the print-log view (#1348-followup).
- await _null_print_log_thumbnail_paths(self.db, archive_id)
- # Delete database record FIRST — if the commit fails (e.g. database locked
- # during concurrent bulk deletes), the files stay on disk and nothing is lost.
- await self.db.delete(archive)
- await self.db.commit()
- # Only delete files AFTER the DB commit succeeds to avoid orphaned records
- if dir_to_delete:
- shutil.rmtree(dir_to_delete, ignore_errors=True)
- return True
- async def attach_timelapse(
- self,
- archive_id: int,
- timelapse_data: bytes,
- filename: str = "timelapse.mp4",
- ) -> bool:
- """Attach a timelapse video to an archive.
- Non-MP4 videos (e.g. AVI from P1S) are saved as-is and a background
- task converts them to MP4 for browser compatibility.
- """
- import asyncio
- archive = await self.get_archive(archive_id)
- if not archive:
- return False
- # Get archive directory
- file_path = settings.base_dir / archive.file_path
- archive_dir = file_path.parent
- # Save timelapse - use thread pool to avoid blocking event loop
- # (timelapse files can be 100MB+, sync write blocks for seconds)
- timelapse_file = archive_dir / filename
- await asyncio.to_thread(timelapse_file.write_bytes, timelapse_data)
- # Update archive record
- archive.timelapse_path = str(timelapse_file.relative_to(settings.base_dir))
- await self.db.commit()
- # For non-MP4 videos (e.g. AVI from P1S), kick off background conversion
- if not filename.lower().endswith(".mp4"):
- asyncio.create_task(
- _convert_timelapse_to_mp4(archive_id, timelapse_file),
- name=f"timelapse-convert-{archive_id}",
- )
- return True
- async def _convert_timelapse_to_mp4(archive_id: int, source_path: Path) -> None:
- """Background task: convert non-MP4 timelapse (e.g. AVI from P1S) to MP4.
- Runs with low CPU priority (-threads 1, nice) so it doesn't starve
- other processes on resource-constrained devices like Raspberry Pi.
- """
- import asyncio
- from backend.app.core.database import async_session
- from backend.app.services.camera import get_ffmpeg_path
- logger = logging.getLogger(__name__)
- ffmpeg = get_ffmpeg_path()
- if not ffmpeg:
- logger.info(
- "FFmpeg not available, skipping timelapse conversion for archive %s (file saved as %s)",
- archive_id,
- source_path.suffix,
- )
- return
- mp4_path = source_path.with_suffix(".mp4")
- try:
- cmd = [
- ffmpeg,
- "-y",
- "-i",
- str(source_path),
- "-c:v",
- "libx264",
- "-preset",
- "fast",
- "-crf",
- "23",
- "-threads",
- "1",
- "-movflags",
- "+faststart",
- str(mp4_path),
- ]
- # Try with nice for lower CPU priority (standard on Linux/macOS)
- try:
- process = await asyncio.create_subprocess_exec(
- "nice",
- "-n",
- "19",
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- except FileNotFoundError:
- # nice not available (e.g. Windows), run without
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- _, stderr = await process.communicate()
- if process.returncode != 0:
- logger.warning(
- "Timelapse conversion failed for archive %s: %s",
- archive_id,
- stderr.decode()[-500:],
- )
- if mp4_path.exists():
- mp4_path.unlink()
- return
- # Update DB path to the new MP4 file
- async with async_session() as db:
- from backend.app.models.archive import PrintArchive
- result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
- archive = result.scalar_one_or_none()
- if archive:
- archive.timelapse_path = str(mp4_path.relative_to(settings.base_dir))
- await db.commit()
- # Remove original non-MP4 file
- if source_path.exists():
- source_path.unlink()
- logger.info(
- "Converted timelapse to MP4 for archive %s (%s → %s)",
- archive_id,
- source_path.name,
- mp4_path.name,
- )
- except Exception as e:
- logger.warning("Timelapse conversion error for archive %s: %s", archive_id, e)
- if mp4_path.exists():
- mp4_path.unlink()
|