import hashlib import json import logging import re import shutil import zipfile from datetime import datetime from pathlib import Path from xml.etree import ElementTree as ET from sqlalchemy import and_, or_, select from sqlalchemy.ext.asyncio import AsyncSession from backend.app.core.config import settings from backend.app.models.archive import PrintArchive from backend.app.models.filament import Filament from backend.app.models.printer import Printer logger = logging.getLogger(__name__) class ThreeMFParser: """Parser for Bambu Lab 3MF files.""" def __init__(self, file_path: Path, plate_number: int | None = None): self.file_path = file_path self.plate_number = plate_number # Which plate was printed (1, 2, 3, etc.) self.metadata: dict = {} def parse(self) -> dict: """Extract metadata from 3MF file.""" try: with zipfile.ZipFile(self.file_path, "r") as zf: self._parse_slice_info(zf) # Now sets self.plate_number from slice_info self._parse_project_settings(zf) self._parse_gcode_header(zf) self._parse_3dmodel(zf) self._extract_thumbnail(zf) # Uses correct plate_number for thumbnail # Enhance print_name with plate info if this is a multi-plate export plate_index = self.metadata.get("_plate_index") if plate_index and plate_index > 1: # Append plate number to distinguish from other plates existing_name = self.metadata.get("print_name", "") if existing_name and f"Plate {plate_index}" not in existing_name: self.metadata["print_name"] = f"{existing_name} - Plate {plate_index}" # ALWAYS prefer slice_info values - they contain ONLY filaments actually used in print # project_settings contains ALL configured filaments (AMS slots), not just used ones if self.metadata.get("_slice_filament_type"): self.metadata["filament_type"] = self.metadata["_slice_filament_type"] if self.metadata.get("_slice_filament_color"): self.metadata["filament_color"] = self.metadata["_slice_filament_color"] # Clean up internal keys self.metadata.pop("_slice_filament_type", None) self.metadata.pop("_slice_filament_color", None) self.metadata.pop("_plate_index", None) except Exception: pass return self.metadata def _parse_slice_info(self, zf: zipfile.ZipFile): """Parse slice_info.config for print settings and printable objects.""" try: if "Metadata/slice_info.config" in zf.namelist(): content = zf.read("Metadata/slice_info.config").decode() root = ET.fromstring(content) # Find the plate element (single-plate exports only have one plate) plate = root.find(".//plate") if plate is not None: # Extract metadata from plate element for meta in plate.findall("metadata"): key = meta.get("key") value = meta.get("value") if key == "index" and value: # Extract plate index - this tells us which plate was exported try: extracted_index = int(value) # Set plate_number if not already set from filename if not self.plate_number: self.plate_number = extracted_index # Store in metadata for print_name generation self.metadata["_plate_index"] = extracted_index except ValueError: pass elif key == "prediction" and value: self.metadata["print_time_seconds"] = int(value) elif key == "weight" and value: self.metadata["filament_used_grams"] = float(value) # Extract printable objects for skip object functionality # Objects are stored as printable_objects = {} for obj in plate.findall("object"): identify_id = obj.get("identify_id") name = obj.get("name") skipped = obj.get("skipped", "false") # Only include objects that are not pre-skipped if identify_id and name and skipped.lower() != "true": try: printable_objects[int(identify_id)] = name except ValueError: pass if printable_objects: self.metadata["printable_objects"] = printable_objects # Get filament info from filaments ACTUALLY USED in the print # slice_info has # Only include filaments where used_g > 0 filaments = root.findall(".//filament") if filaments: # Collect unique filament types and colors for filaments that are actually used types = [] colors = [] for f in filaments: # Check if this filament is actually used in the print used_g = f.get("used_g", "0") try: used_amount = float(used_g) except (ValueError, TypeError): used_amount = 0 # Only include if used_g > 0 (filament is actually consumed) if used_amount > 0: ftype = f.get("type") fcolor = f.get("color") if ftype and ftype not in types: types.append(ftype) if fcolor and fcolor not in colors: colors.append(fcolor) if types: self.metadata["_slice_filament_type"] = ", ".join(types) if colors: self.metadata["_slice_filament_color"] = ",".join(colors) except Exception: pass def _parse_project_settings(self, zf: zipfile.ZipFile): """Parse project settings for print configuration.""" try: if "Metadata/project_settings.config" in zf.namelist(): content = zf.read("Metadata/project_settings.config").decode() try: data = json.loads(content) self._extract_filament_info(data) self._extract_print_settings(data) except json.JSONDecodeError: pass except Exception: pass def _parse_gcode_header(self, zf: zipfile.ZipFile): """Parse G-code file header for total layer count.""" import re try: # Look for plate_1.gcode or similar gcode_files = [f for f in zf.namelist() if f.endswith(".gcode")] if not gcode_files: return # Read first 2KB of G-code (header contains the layer count) gcode_path = gcode_files[0] with zf.open(gcode_path) as f: header = f.read(2048).decode("utf-8", errors="ignore") # Look for "; total layer number: XX" pattern match = re.search(r";\s*total\s+layer\s+number[:\s]+(\d+)", header, re.IGNORECASE) if match: self.metadata["total_layers"] = int(match.group(1)) except Exception: pass def _extract_filament_info(self, data: dict): """Extract filament info, preferring non-support filaments.""" try: filament_types = data.get("filament_type", []) filament_colors = data.get("filament_colour", []) filament_is_support = data.get("filament_is_support", []) if not filament_types: return # Collect all non-support filaments non_support_types = [] non_support_colors = [] for i, ftype in enumerate(filament_types): is_support = filament_is_support[i] if i < len(filament_is_support) else "0" if is_support == "0": if ftype and ftype not in non_support_types: non_support_types.append(ftype) if i < len(filament_colors) and filament_colors[i]: color = filament_colors[i] if color not in non_support_colors: non_support_colors.append(color) # Fallback to first filament if all are support if not non_support_types and filament_types: non_support_types = [filament_types[0]] if not non_support_colors and filament_colors: non_support_colors = [filament_colors[0]] # Store filament type(s) if non_support_types: self.metadata["filament_type"] = ", ".join(non_support_types) # Store all colors as comma-separated (for multi-color display) if non_support_colors: self.metadata["filament_color"] = ",".join(non_support_colors) except Exception: pass def _extract_print_settings(self, data: dict): """Extract print settings from JSON config.""" try: # Layer height - usually an array, get first value if "layer_height" in data: val = data["layer_height"] if isinstance(val, list) and val: self.metadata["layer_height"] = float(val[0]) elif isinstance(val, (int, float, str)): self.metadata["layer_height"] = float(val) # Nozzle diameter if "nozzle_diameter" in data: val = data["nozzle_diameter"] if isinstance(val, list) and val: self.metadata["nozzle_diameter"] = float(val[0]) elif isinstance(val, (int, float, str)): self.metadata["nozzle_diameter"] = float(val) # Bed temperature - first layer or regular for key in ["bed_temperature_initial_layer", "bed_temperature"]: if key in data: val = data[key] if isinstance(val, list) and val: self.metadata["bed_temperature"] = int(float(val[0])) elif isinstance(val, (int, float, str)): self.metadata["bed_temperature"] = int(float(val)) break # Nozzle temperature for key in ["nozzle_temperature_initial_layer", "nozzle_temperature"]: if key in data: val = data[key] if isinstance(val, list) and val: self.metadata["nozzle_temperature"] = int(float(val[0])) elif isinstance(val, (int, float, str)): self.metadata["nozzle_temperature"] = int(float(val)) break except Exception: pass def _extract_settings_from_content(self, content: str): """Extract print settings from config content.""" settings_map = { "layer_height": ("layer_height", float), "nozzle_diameter": ("nozzle_diameter", float), "bed_temperature": ("bed_temperature", int), "nozzle_temperature": ("nozzle_temperature", int), } for key, (search_key, converter) in settings_map.items(): if key not in self.metadata: try: # Try JSON format if f'"{search_key}"' in content: start = content.find(f'"{search_key}"') value_start = content.find(":", start) + 1 value_end = content.find(",", value_start) if value_end == -1: value_end = content.find("}", value_start) value = content[value_start:value_end].strip().strip('"') self.metadata[key] = converter(value) except Exception: pass def _parse_3dmodel(self, zf: zipfile.ZipFile): """Parse 3D/3dmodel.model for MakerWorld metadata.""" import re try: model_path = "3D/3dmodel.model" if model_path not in zf.namelist(): return content = zf.read(model_path).decode("utf-8", errors="ignore") # Parse XML metadata elements # MakerWorld adds metadata like: username metadata_pattern = r']*>([^<]*)' matches = re.findall(metadata_pattern, content) makerworld_fields = {} for name, value in matches: makerworld_fields[name] = value.strip() # Check for direct MakerWorld URL in content url_pattern = r'https?://makerworld\.com/[^\s<>"\']+/models/(\d+)' url_match = re.search(url_pattern, content) if url_match: self.metadata["makerworld_url"] = url_match.group(0) self.metadata["makerworld_model_id"] = url_match.group(1) # Extract model ID from DSM reference in image URLs # Format: https://makerworld.bblmw.com/makerworld/model/DSM00000001275614/... # The numeric part (1275614) is the MakerWorld model ID if "makerworld_url" not in self.metadata: dsm_pattern = r"DSM0+(\d+)" dsm_match = re.search(dsm_pattern, content) if dsm_match: model_id = dsm_match.group(1) self.metadata["makerworld_url"] = f"https://makerworld.com/en/models/{model_id}" self.metadata["makerworld_model_id"] = model_id # Store designer info if "Designer" in makerworld_fields: self.metadata["designer"] = makerworld_fields["Designer"] if "Title" in makerworld_fields: self.metadata["print_name"] = makerworld_fields["Title"] except Exception: pass def _extract_thumbnail(self, zf: zipfile.ZipFile): """Extract thumbnail image from 3MF. If a plate_number was specified, try to use that plate's thumbnail first. """ thumbnail_paths = [] # If a specific plate was printed, try that thumbnail first if self.plate_number: thumbnail_paths.append(f"Metadata/plate_{self.plate_number}.png") # Fallback to default paths thumbnail_paths.extend( [ "Metadata/plate_1.png", "Metadata/thumbnail.png", "Metadata/model_thumbnail.png", ] ) for thumb_path in thumbnail_paths: if thumb_path in zf.namelist(): self.metadata["_thumbnail_data"] = zf.read(thumb_path) self.metadata["_thumbnail_ext"] = ".png" break def extract_printable_objects_from_3mf( data: bytes, plate_number: int | None = None, include_positions: bool = False ) -> dict[int, str] | dict[int, dict] | tuple[dict[int, dict], list | None]: """Extract printable objects from 3MF file bytes. This is a lightweight function used during print start to get the list of objects that can be skipped. Args: data: Raw bytes of the 3MF file plate_number: Which plate was printed (1-based), or None for first plate include_positions: If True, return tuple of (objects dict, bbox_all) Returns: If include_positions=False: Dictionary mapping identify_id (int) to object name (str) If include_positions=True: Tuple of (dict mapping identify_id to {name, x, y}, bbox_all list or None) """ import json from io import BytesIO printable_objects: dict = {} bbox_all: list | None = None try: with zipfile.ZipFile(BytesIO(data), "r") as zf: if "Metadata/slice_info.config" not in zf.namelist(): return printable_objects content = zf.read("Metadata/slice_info.config").decode() root = ET.fromstring(content) # Find the correct plate if plate_number: plate = root.find(f".//plate[@plate_idx='{plate_number}']") if plate is None: plate = root.find(".//plate") else: plate = root.find(".//plate") if plate is None: return printable_objects # Get actual plate index from metadata (sliced files only have one plate) plate_idx = plate_number or 1 for meta in plate.findall("metadata"): if meta.get("key") == "index": try: plate_idx = int(meta.get("value", "1")) except ValueError: pass break # Load position data from plate_N.json if we need positions # Build a lookup by name - use list to handle duplicate names bbox_by_name: dict[str, list[list]] = {} if include_positions: plate_json_path = f"Metadata/plate_{plate_idx}.json" if plate_json_path in zf.namelist(): try: plate_json = json.loads(zf.read(plate_json_path).decode()) # Get bbox_all - the bounding box of all objects (used for image bounds) bbox_all = plate_json.get("bbox_all") for bbox_obj in plate_json.get("bbox_objects", []): obj_name = bbox_obj.get("name") bbox = bbox_obj.get("bbox", []) if obj_name and len(bbox) >= 4: if obj_name not in bbox_by_name: bbox_by_name[obj_name] = [] bbox_by_name[obj_name].append(bbox) except (json.JSONDecodeError, KeyError): pass # Extract objects from slice_info.config for obj in plate.findall("object"): identify_id = obj.get("identify_id") name = obj.get("name") skipped = obj.get("skipped", "false") if identify_id and name and skipped.lower() != "true": try: obj_id = int(identify_id) if include_positions: x, y = None, None # Match by name - pop first bbox to handle duplicates bboxes = bbox_by_name.get(name) if bboxes: bbox = bboxes.pop(0) # Calculate center from bbox [x_min, y_min, x_max, y_max] x = (bbox[0] + bbox[2]) / 2 y = (bbox[1] + bbox[3]) / 2 printable_objects[obj_id] = {"name": name, "x": x, "y": y} else: printable_objects[obj_id] = name except ValueError: pass except Exception: pass if include_positions: return printable_objects, bbox_all return printable_objects class ProjectPageParser: """Parser for extracting project page data from Bambu Lab 3MF files.""" def __init__(self, file_path: Path): self.file_path = file_path def parse(self, archive_id: int) -> dict: """Extract project page metadata and images from 3MF file.""" import html import re result = { "title": None, "description": None, "designer": None, "designer_user_id": None, "license": None, "copyright": None, "creation_date": None, "modification_date": None, "origin": None, "profile_title": None, "profile_description": None, "profile_cover": None, "profile_user_id": None, "profile_user_name": None, "design_model_id": None, "design_profile_id": None, "design_region": None, "model_pictures": [], "profile_pictures": [], "thumbnails": [], } try: with zipfile.ZipFile(self.file_path, "r") as zf: # Parse 3D/3dmodel.model for metadata model_path = "3D/3dmodel.model" if model_path in zf.namelist(): content = zf.read(model_path).decode("utf-8", errors="ignore") # Extract metadata elements using regex # Format: Value or metadata_pattern = r']*>([^<]*)' matches = re.findall(metadata_pattern, content) field_mapping = { "Title": "title", "Description": "description", "Designer": "designer", "DesignerUserId": "designer_user_id", "License": "license", "Copyright": "copyright", "CreationDate": "creation_date", "ModificationDate": "modification_date", "Origin": "origin", "ProfileTitle": "profile_title", "ProfileDescription": "profile_description", "ProfileCover": "profile_cover", "ProfileUserId": "profile_user_id", "ProfileUserName": "profile_user_name", "DesignModelId": "design_model_id", "DesignProfileId": "design_profile_id", "DesignRegion": "design_region", } for name, value in matches: if name in field_mapping: # Decode HTML entities multiple times (content is often triple-encoded) decoded = value.strip() prev = None while prev != decoded: prev = decoded decoded = html.unescape(decoded) # Normalize non-breaking spaces to regular spaces decoded = decoded.replace("\xa0", " ") result[field_mapping[name]] = decoded if decoded else None # List images in Auxiliaries folder from urllib.parse import quote for name in zf.namelist(): if name.startswith("Auxiliaries/Model Pictures/"): filename = name.split("/")[-1] if filename: result["model_pictures"].append( { "name": filename, "path": name, "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}", } ) elif name.startswith("Auxiliaries/Profile Pictures/"): filename = name.split("/")[-1] if filename: result["profile_pictures"].append( { "name": filename, "path": name, "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}", } ) elif name.startswith("Auxiliaries/.thumbnails/"): filename = name.split("/")[-1] if filename: result["thumbnails"].append( { "name": filename, "path": name, "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}", } ) except Exception as e: result["_error"] = str(e) return result def get_image(self, image_path: str) -> tuple[bytes, str] | None: """Extract an image from the 3MF file. Returns tuple of (image_data, content_type) or None if not found. """ try: with zipfile.ZipFile(self.file_path, "r") as zf: if image_path in zf.namelist(): data = zf.read(image_path) # Determine content type from extension ext = image_path.lower().split(".")[-1] content_types = { "png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", "webp": "image/webp", "gif": "image/gif", } content_type = content_types.get(ext, "application/octet-stream") return (data, content_type) except Exception: pass return None def update_metadata(self, updates: dict) -> bool: """Update project page metadata in the 3MF file. Args: updates: Dict with fields to update (title, description, designer, etc.) Returns: True if successful, False otherwise. """ import html import re import tempfile try: # Read the 3MF file with zipfile.ZipFile(self.file_path, "r") as zf_read: # Find and read the 3dmodel.model file model_path = "3D/3dmodel.model" if model_path not in zf_read.namelist(): return False content = zf_read.read(model_path).decode("utf-8") # Update metadata fields field_mapping = { "title": "Title", "description": "Description", "designer": "Designer", "license": "License", "copyright": "Copyright", "profile_title": "ProfileTitle", "profile_description": "ProfileDescription", } for field, xml_name in field_mapping.items(): if field in updates and updates[field] is not None: new_value = html.escape(updates[field]) # Replace existing metadata or we'd need to add it pattern = rf'(]*>)[^<]*()' replacement = rf"\g<1>{new_value}\g<2>" content = re.sub(pattern, replacement, content) # Write to a temporary file first with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf") as tmp: tmp_path = Path(tmp.name) # Create new zip with updated content with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as zf_write: for item in zf_read.namelist(): if item == model_path: zf_write.writestr(item, content.encode("utf-8")) else: zf_write.writestr(item, zf_read.read(item)) # Replace original file with updated one shutil.move(tmp_path, self.file_path) return True except Exception: # Clean up temp file if it exists if "tmp_path" in locals() and tmp_path.exists(): tmp_path.unlink() return False class ArchiveService: """Service for archiving print jobs.""" def __init__(self, db: AsyncSession): self.db = db @staticmethod def compute_file_hash(file_path: Path) -> str: """Compute SHA256 hash of a file for duplicate detection.""" sha256 = hashlib.sha256() with open(file_path, "rb") as f: # Read in chunks to handle large files for chunk in iter(lambda: f.read(8192), b""): sha256.update(chunk) return sha256.hexdigest() async def get_duplicate_hashes(self) -> set[str]: """Get all content hashes that appear more than once. Returns a set of hashes that have duplicates. """ from sqlalchemy import func result = await self.db.execute( select(PrintArchive.content_hash) .where(PrintArchive.content_hash.isnot(None)) .group_by(PrintArchive.content_hash) .having(func.count(PrintArchive.id) > 1) ) return {row[0] for row in result.all()} async def find_duplicates( self, archive_id: int, content_hash: str | None = None, print_name: str | None = None, makerworld_model_id: str | None = None, ) -> list[dict]: """Find duplicate archives based on hash or name matching. Returns list of dicts with id, print_name, created_at, match_type. """ duplicates = [] # First, find exact matches by content hash if content_hash: result = await self.db.execute( select(PrintArchive) .where( and_( PrintArchive.content_hash == content_hash, PrintArchive.id != archive_id, ) ) .order_by(PrintArchive.created_at.desc()) .limit(10) ) for archive in result.scalars().all(): duplicates.append( { "id": archive.id, "print_name": archive.print_name, "created_at": archive.created_at, "match_type": "exact", } ) # Then, find similar matches by print name or MakerWorld ID if print_name or makerworld_model_id: conditions = [PrintArchive.id != archive_id] name_conditions = [] if print_name: # Match if print names are similar (ignoring case) name_conditions.append(PrintArchive.print_name.ilike(print_name)) if makerworld_model_id: # Match by MakerWorld model ID stored in extra_data # Use json_extract for SQLite compatibility (astext is PostgreSQL-only) from sqlalchemy import func name_conditions.append( func.json_extract(PrintArchive.extra_data, "$.makerworld_model_id") == str(makerworld_model_id) ) if name_conditions: conditions.append(or_(*name_conditions)) result = await self.db.execute( select(PrintArchive).where(and_(*conditions)).order_by(PrintArchive.created_at.desc()).limit(10) ) for archive in result.scalars().all(): # Don't add if already in duplicates (exact match) if not any(d["id"] == archive.id for d in duplicates): duplicates.append( { "id": archive.id, "print_name": archive.print_name, "created_at": archive.created_at, "match_type": "similar", } ) return duplicates async def archive_print( self, printer_id: int | None, source_file: Path, print_data: dict | None = None, ) -> PrintArchive | None: """Archive a 3MF file with metadata.""" # Verify printer exists if specified if printer_id is not None: result = await self.db.execute(select(Printer).where(Printer.id == printer_id)) printer = result.scalar_one_or_none() if not printer: return None # Create archive directory structure timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") archive_name = f"{timestamp}_{source_file.stem}" # Use "unassigned" folder for archives without a printer printer_folder = str(printer_id) if printer_id is not None else "unassigned" archive_dir = settings.archive_dir / printer_folder / archive_name archive_dir.mkdir(parents=True, exist_ok=True) # Copy 3MF file dest_file = archive_dir / source_file.name shutil.copy2(source_file, dest_file) # Compute content hash for duplicate detection content_hash = self.compute_file_hash(dest_file) # Extract plate number from filename (e.g., "plate_5" from "/data/Metadata/plate_5.gcode") plate_number = None if print_data: filename = print_data.get("filename", "") match = re.search(r"plate_(\d+)", filename) if match: plate_number = int(match.group(1)) # Parse 3MF metadata parser = ThreeMFParser(dest_file, plate_number=plate_number) metadata = parser.parse() # Save thumbnail if present thumbnail_path = None if "_thumbnail_data" in metadata: thumb_file = archive_dir / f"thumbnail{metadata['_thumbnail_ext']}" thumb_file.write_bytes(metadata["_thumbnail_data"]) thumbnail_path = str(thumb_file.relative_to(settings.base_dir)) del metadata["_thumbnail_data"] del metadata["_thumbnail_ext"] # Merge with print data from MQTT if print_data: metadata["_print_data"] = print_data # Determine status and timestamps status = print_data.get("status", "completed") if print_data else "archived" started_at = datetime.now() if status == "printing" else None completed_at = datetime.now() if status in ("completed", "failed", "archived") else None # Calculate cost based on filament usage and type cost = None filament_grams = metadata.get("filament_used_grams") filament_type = metadata.get("filament_type") if filament_grams and filament_type: # For multi-material prints, use the first filament type for cost calculation primary_type = filament_type.split(",")[0].strip() # Look up filament cost_per_kg from database filament_result = await self.db.execute(select(Filament).where(Filament.type == primary_type).limit(1)) filament = filament_result.scalar_one_or_none() if filament: cost = round((filament_grams / 1000) * filament.cost_per_kg, 2) else: # Use default filament cost from settings from backend.app.api.routes.settings import get_setting default_cost_setting = await get_setting(self.db, "default_filament_cost") default_cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0 cost = round((filament_grams / 1000) * default_cost_per_kg, 2) # Calculate quantity from printable objects count # printable_objects is a dict of {identify_id: name} for non-skipped objects quantity = 1 # Default to 1 printable_objects = metadata.get("printable_objects") if printable_objects and isinstance(printable_objects, dict): quantity = len(printable_objects) logger.debug(f"Auto-detected {quantity} parts from 3MF printable objects") # Create archive record archive = PrintArchive( printer_id=printer_id, filename=source_file.name, file_path=str(dest_file.relative_to(settings.base_dir)), file_size=dest_file.stat().st_size, content_hash=content_hash, thumbnail_path=thumbnail_path, print_name=metadata.get("print_name") or source_file.stem, print_time_seconds=metadata.get("print_time_seconds"), filament_used_grams=metadata.get("filament_used_grams"), filament_type=metadata.get("filament_type"), filament_color=metadata.get("filament_color"), layer_height=metadata.get("layer_height"), total_layers=metadata.get("total_layers"), nozzle_diameter=metadata.get("nozzle_diameter"), bed_temperature=metadata.get("bed_temperature"), nozzle_temperature=metadata.get("nozzle_temperature"), makerworld_url=metadata.get("makerworld_url"), designer=metadata.get("designer"), status=status, started_at=started_at, completed_at=completed_at, cost=cost, quantity=quantity, extra_data=metadata, ) self.db.add(archive) await self.db.commit() await self.db.refresh(archive) return archive async def get_archive(self, archive_id: int) -> PrintArchive | None: """Get an archive by ID.""" result = await self.db.execute(select(PrintArchive).where(PrintArchive.id == archive_id)) return result.scalar_one_or_none() async def update_archive_status( self, archive_id: int, status: str, completed_at: datetime | None = None, failure_reason: str | None = None, ) -> bool: """Update the status of an archive.""" archive = await self.get_archive(archive_id) if not archive: return False archive.status = status if completed_at: archive.completed_at = completed_at if failure_reason: archive.failure_reason = failure_reason await self.db.commit() return True async def add_reprint_cost(self, archive_id: int) -> bool: """Add cost for a reprint to the existing archive cost.""" archive = await self.get_archive(archive_id) if not archive: return False if not archive.filament_used_grams or not archive.filament_type: return False # Calculate cost based on filament type or default from backend.app.api.routes.settings import get_setting primary_type = archive.filament_type.split(",")[0].strip() # Look up filament cost_per_kg from database filament_result = await self.db.execute(select(Filament).where(Filament.type == primary_type).limit(1)) filament = filament_result.scalar_one_or_none() if filament: cost_per_kg = filament.cost_per_kg else: # Use default filament cost from settings default_cost_setting = await get_setting(self.db, "default_filament_cost") cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0 additional_cost = round((archive.filament_used_grams / 1000) * cost_per_kg, 2) # Add to existing cost (or set if None) if archive.cost is None: archive.cost = additional_cost else: archive.cost = round(archive.cost + additional_cost, 2) await self.db.commit() logger.info(f"Added reprint cost {additional_cost} to archive {archive_id}, new total: {archive.cost}") return True async def list_archives( self, printer_id: int | None = None, project_id: int | None = None, limit: int = 50, offset: int = 0, ) -> list[PrintArchive]: """List archives with optional filtering.""" from sqlalchemy.orm import selectinload query = ( select(PrintArchive).options(selectinload(PrintArchive.project)).order_by(PrintArchive.created_at.desc()) ) if printer_id: query = query.where(PrintArchive.printer_id == printer_id) if project_id: query = query.where(PrintArchive.project_id == project_id) query = query.limit(limit).offset(offset) result = await self.db.execute(query) return list(result.scalars().all()) async def delete_archive(self, archive_id: int) -> bool: """Delete an archive and its files.""" archive = await self.get_archive(archive_id) if not archive: return False # Delete files - with CRITICAL safety checks to prevent accidental deletion # of parent directories (e.g., /opt) if file_path is empty/malformed if archive.file_path and archive.file_path.strip(): file_path = settings.base_dir / archive.file_path if file_path.exists(): archive_dir = file_path.parent # Safety check 1: archive_dir must be inside archive_dir try: archive_dir.resolve().relative_to(settings.archive_dir.resolve()) except ValueError: logger.error( f"SECURITY: Refusing to delete archive {archive_id} - " f"path {archive_dir} is outside archive directory {settings.archive_dir}" ) # Still delete the database record, just not the files await self.db.delete(archive) await self.db.commit() return True # Safety check 2: archive_dir must be at least 1 level deep inside archive_dir # (should be archive_dir/uuid/file.3mf, so parent should be archive_dir/uuid) try: relative_path = archive_dir.resolve().relative_to(settings.archive_dir.resolve()) if len(relative_path.parts) < 1: logger.error( f"SECURITY: Refusing to delete archive {archive_id} - " f"path {archive_dir} is not deep enough inside archive directory" ) await self.db.delete(archive) await self.db.commit() return True except ValueError: pass # Already handled above shutil.rmtree(archive_dir, ignore_errors=True) else: logger.error( f"SECURITY: Refusing to delete files for archive {archive_id} - " f"file_path is empty or invalid: '{archive.file_path}'" ) # Delete database record await self.db.delete(archive) await self.db.commit() return True async def attach_timelapse( self, archive_id: int, timelapse_data: bytes, filename: str = "timelapse.mp4", ) -> bool: """Attach a timelapse video to an archive.""" import asyncio archive = await self.get_archive(archive_id) if not archive: return False # Get archive directory file_path = settings.base_dir / archive.file_path archive_dir = file_path.parent # Save timelapse - use thread pool to avoid blocking event loop # (timelapse files can be 100MB+, sync write blocks for seconds) timelapse_file = archive_dir / filename await asyncio.to_thread(timelapse_file.write_bytes, timelapse_data) # Update archive record archive.timelapse_path = str(timelapse_file.relative_to(settings.base_dir)) await self.db.commit() return True