import io import json import logging import zipfile from collections import defaultdict from datetime import date, datetime, time, timezone from decimal import ROUND_HALF_UP, Decimal from pathlib import Path from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile from fastapi.responses import FileResponse, Response from sqlalchemy import and_, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from backend.app.core.auth import ( RequirePermissionIfAuthEnabled, require_ownership_permission, ) from backend.app.core.config import settings from backend.app.core.database import get_db from backend.app.core.permissions import Permission from backend.app.models.archive import PrintArchive from backend.app.models.filament import Filament from backend.app.models.spool_usage_history import SpoolUsageHistory from backend.app.models.user import User from backend.app.schemas.archive import ArchiveResponse, ArchiveSlim, ArchiveStats, ArchiveUpdate, ReprintRequest from backend.app.services.archive import ArchiveService from backend.app.utils.threemf_tools import extract_nozzle_mapping_from_3mf logger = logging.getLogger(__name__) router = APIRouter(prefix="/archives", tags=["archives"]) def compute_time_accuracy(archive: PrintArchive) -> dict: """Compute actual print time and accuracy for an archive. Returns dict with actual_time_seconds and time_accuracy. time_accuracy = (estimated / actual) * 100 - 100% = perfect estimate - >100% = print was faster than estimated - <100% = print took longer than estimated """ result = {"actual_time_seconds": None, "time_accuracy": None} if archive.started_at and archive.completed_at and archive.status == "completed": actual_seconds = int((archive.completed_at - archive.started_at).total_seconds()) if actual_seconds > 0: result["actual_time_seconds"] = actual_seconds if archive.print_time_seconds and archive.print_time_seconds > 0: # Calculate accuracy as percentage accuracy = (archive.print_time_seconds / actual_seconds) * 100 # Sanity check: skip unreasonable values (e.g., manually changed status) # Valid range: 5% to 500% (print took 20x longer to 5x faster than estimated) if 5 <= accuracy <= 500: result["time_accuracy"] = round(accuracy, 1) return result def archive_to_response( archive: PrintArchive, duplicates: list[dict] | None = None, duplicate_count: int = 0, duplicate_sequence: int = 0, original_archive_id: int | None = None, ) -> dict: """Convert archive model to response dict with computed fields.""" data = { "id": archive.id, "printer_id": archive.printer_id, "project_id": archive.project_id, "project_name": archive.project.name if archive.project else None, "filename": archive.filename, "file_path": archive.file_path, "file_size": archive.file_size, "content_hash": archive.content_hash, "thumbnail_path": archive.thumbnail_path, "timelapse_path": archive.timelapse_path, "source_3mf_path": archive.source_3mf_path, "f3d_path": archive.f3d_path, "duplicates": duplicates, "duplicate_count": duplicate_count if duplicates is None else len(duplicates), "duplicate_sequence": duplicate_sequence, "original_archive_id": original_archive_id, "print_name": archive.print_name, "print_time_seconds": archive.print_time_seconds, "filament_used_grams": archive.filament_used_grams, "filament_type": archive.filament_type, "filament_color": archive.filament_color, "layer_height": archive.layer_height, "total_layers": archive.total_layers, "nozzle_diameter": archive.nozzle_diameter, "bed_temperature": archive.bed_temperature, "nozzle_temperature": archive.nozzle_temperature, "sliced_for_model": archive.sliced_for_model, "status": archive.status, "started_at": archive.started_at, "completed_at": archive.completed_at, "extra_data": archive.extra_data, "makerworld_url": archive.makerworld_url, "designer": archive.designer, "external_url": archive.external_url, "is_favorite": archive.is_favorite, "tags": archive.tags, "notes": archive.notes, "cost": archive.cost, "photos": archive.photos, "failure_reason": archive.failure_reason, "quantity": archive.quantity, "energy_kwh": archive.energy_kwh, "energy_cost": archive.energy_cost, "created_at": archive.created_at, # User tracking (Issue #206) "created_by_id": archive.created_by_id, "created_by_username": archive.created_by.username if archive.created_by else None, } # Add computed time accuracy fields accuracy_data = compute_time_accuracy(archive) data.update(accuracy_data) return data @router.get("/", response_model=list[ArchiveResponse]) async def list_archives( printer_id: int | None = None, project_id: int | None = None, date_from: date | None = Query(None), date_to: date | None = Query(None), limit: int = 50, offset: int = 0, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """List archived prints.""" service = ArchiveService(db) archives = await service.list_archives( printer_id=printer_id, project_id=project_id, date_from=date_from, date_to=date_to, limit=limit, offset=offset, ) # Get sets of duplicate hashes and duplicate (name, hash) pairs (efficient single queries) duplicate_hashes, duplicate_name_hash_pairs = await service.get_duplicate_hashes_and_names() # Batch-load duplicate groups once for the current page keys. duplicate_hashes_in_page = { a.content_hash for a in archives if a.content_hash and a.content_hash in duplicate_hashes } duplicate_name_hash_keys_in_page = { (a.print_name.lower(), a.content_hash) for a in archives if a.print_name and a.content_hash and (a.print_name.lower(), a.content_hash) in duplicate_name_hash_pairs } duplicate_meta_by_archive_id: dict[int, tuple[int, int, int]] = {} if duplicate_hashes_in_page or duplicate_name_hash_keys_in_page: duplicate_group_conditions = [] if duplicate_hashes_in_page: duplicate_group_conditions.append(PrintArchive.content_hash.in_(duplicate_hashes_in_page)) if duplicate_name_hash_keys_in_page: name_hash_conditions = [ and_(func.lower(PrintArchive.print_name) == name, PrintArchive.content_hash == hash_) for name, hash_ in duplicate_name_hash_keys_in_page ] duplicate_group_conditions.extend(name_hash_conditions) duplicate_group_rows = await db.execute( select( PrintArchive.id, PrintArchive.created_at, PrintArchive.content_hash, func.lower(PrintArchive.print_name).label("print_name_lower"), ).where(or_(*duplicate_group_conditions)) ) duplicate_groups_by_hash: dict[str, list[tuple[int, datetime]]] = defaultdict(list) duplicate_groups_by_name_hash: dict[tuple[str, str], list[tuple[int, datetime]]] = defaultdict(list) for archive_id, created_at, content_hash, print_name_lower in duplicate_group_rows.all(): if content_hash and content_hash in duplicate_hashes_in_page: duplicate_groups_by_hash[content_hash].append((archive_id, created_at)) if ( print_name_lower and content_hash and (print_name_lower, content_hash) in duplicate_name_hash_keys_in_page ): duplicate_groups_by_name_hash[(print_name_lower, content_hash)].append((archive_id, created_at)) for group in duplicate_groups_by_hash.values(): if len(group) < 2: continue group.sort(key=lambda x: x[1]) original_id = group[0][0] duplicate_count = len(group) - 1 for sequence, (archive_id, _) in enumerate(group): duplicate_meta_by_archive_id[archive_id] = (sequence, original_id, duplicate_count) # Keep hash-based grouping precedence; name/hash groups only fill missing items. for group in duplicate_groups_by_name_hash.values(): if len(group) < 2: continue group.sort(key=lambda x: x[1]) original_id = group[0][0] duplicate_count = len(group) - 1 for sequence, (archive_id, _) in enumerate(group): duplicate_meta_by_archive_id.setdefault(archive_id, (sequence, original_id, duplicate_count)) # Build response with duplicate sequence and original archive ID pre-computed result = [] for a in archives: has_hash_dup = a.content_hash in duplicate_hashes if a.content_hash else False has_name_dup = ( bool(a.print_name and a.content_hash) and (a.print_name.lower(), a.content_hash) in duplicate_name_hash_pairs ) has_duplicate = has_hash_dup or has_name_dup # Pre-compute duplicate sequence and original archive ID duplicate_sequence = 0 original_archive_id: int | None = None duplicate_count = 1 if has_duplicate else 0 if has_duplicate and a.id in duplicate_meta_by_archive_id: duplicate_sequence, original_archive_id, duplicate_count = duplicate_meta_by_archive_id[a.id] result.append( archive_to_response( a, duplicate_count=duplicate_count, duplicate_sequence=duplicate_sequence, original_archive_id=original_archive_id, ) ) return result @router.get("/slim", response_model=list[ArchiveSlim]) async def list_archives_slim( date_from: date | None = Query(None), date_to: date | None = Query(None), limit: int = Query(default=10000, le=50000), offset: int = 0, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Lightweight archive listing for stats/dashboard widgets. Returns only the fields needed for client-side aggregation, skipping duplicate detection, file paths, and extra_data. """ filters = [] if date_from: dt_from = datetime.combine(date_from, time.min, tzinfo=timezone.utc) filters.append(PrintArchive.created_at >= dt_from) if date_to: dt_to = datetime.combine(date_to, time.max, tzinfo=timezone.utc) filters.append(PrintArchive.created_at <= dt_to) query = ( select( PrintArchive.printer_id, PrintArchive.print_name, PrintArchive.print_time_seconds, PrintArchive.started_at, PrintArchive.completed_at, PrintArchive.filament_used_grams, PrintArchive.filament_type, PrintArchive.filament_color, PrintArchive.status, PrintArchive.cost, PrintArchive.quantity, PrintArchive.created_at, ) .where(*filters) .order_by(PrintArchive.created_at.desc()) .limit(limit) .offset(offset) ) result = await db.execute(query) rows = result.all() return [ { "printer_id": r.printer_id, "print_name": r.print_name, "print_time_seconds": r.print_time_seconds, "actual_time_seconds": ( int((r.completed_at - r.started_at).total_seconds()) if r.started_at and r.completed_at and r.status == "completed" and (r.completed_at - r.started_at).total_seconds() > 0 else None ), "filament_used_grams": r.filament_used_grams, "filament_type": r.filament_type, "filament_color": r.filament_color, "status": r.status, "started_at": r.started_at, "completed_at": r.completed_at, "cost": r.cost, "quantity": r.quantity, "created_at": r.created_at, } for r in rows ] @router.get("/search", response_model=list[ArchiveResponse]) async def search_archives( q: str = Query(..., min_length=2, description="Search query"), printer_id: int | None = None, project_id: int | None = None, status: str | None = None, limit: int = 50, offset: int = 0, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Full-text search across archives. Searches print_name, filename, tags, notes, designer, and filament_type fields. Supports partial matches with wildcards (e.g., 'vor*' matches 'voron'). """ from sqlalchemy import text from sqlalchemy.orm import selectinload # Prepare search query - add wildcard for partial matches search_term = q.strip() if not search_term.endswith("*"): search_term = f"{search_term}*" # Build the FTS query # Using MATCH for FTS5 full-text search fts_query = text(""" SELECT rowid FROM archive_fts WHERE archive_fts MATCH :search_term ORDER BY rank LIMIT :limit OFFSET :offset """) try: result = await db.execute(fts_query, {"search_term": search_term, "limit": limit + 100, "offset": 0}) matched_ids = [row[0] for row in result.fetchall()] except Exception as e: logger.warning("FTS search failed, falling back to LIKE search: %s", e) # Fallback to LIKE search if FTS fails like_pattern = f"%{q}%" query = ( select(PrintArchive) .options(selectinload(PrintArchive.project)) .where( (PrintArchive.print_name.ilike(like_pattern)) | (PrintArchive.filename.ilike(like_pattern)) | (PrintArchive.tags.ilike(like_pattern)) | (PrintArchive.notes.ilike(like_pattern)) | (PrintArchive.designer.ilike(like_pattern)) | (PrintArchive.filament_type.ilike(like_pattern)) ) .order_by(PrintArchive.created_at.desc()) ) if printer_id: query = query.where(PrintArchive.printer_id == printer_id) if project_id: query = query.where(PrintArchive.project_id == project_id) if status: query = query.where(PrintArchive.status == status) query = query.limit(limit).offset(offset) result = await db.execute(query) archives = result.scalars().all() return [archive_to_response(a) for a in archives] if not matched_ids: return [] # Fetch full archive records for matched IDs query = select(PrintArchive).options(selectinload(PrintArchive.project)).where(PrintArchive.id.in_(matched_ids)) # Apply additional filters if printer_id: query = query.where(PrintArchive.printer_id == printer_id) if project_id: query = query.where(PrintArchive.project_id == project_id) if status: query = query.where(PrintArchive.status == status) result = await db.execute(query) archives_dict = {a.id: a for a in result.scalars().all()} # Preserve FTS ranking order and apply pagination ordered_archives = [archives_dict[id] for id in matched_ids if id in archives_dict] paginated = ordered_archives[offset : offset + limit] return [archive_to_response(a) for a in paginated] @router.post("/search/rebuild-index") async def rebuild_search_index( db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL), ): """Rebuild the full-text search index from existing archives. Use this if search results seem incomplete or incorrect. """ from sqlalchemy import text try: # Clear and rebuild the FTS index await db.execute(text("DELETE FROM archive_fts")) # Repopulate from print_archives await db.execute( text(""" INSERT INTO archive_fts(rowid, print_name, filename, tags, notes, designer, filament_type) SELECT id, print_name, filename, tags, notes, designer, filament_type FROM print_archives """) ) await db.commit() # Count entries result = await db.execute(text("SELECT COUNT(*) FROM archive_fts")) count = result.scalar() or 0 return {"message": f"Search index rebuilt with {count} entries"} except Exception as e: logger.error("Failed to rebuild search index: %s", e) raise HTTPException(status_code=500, detail=f"Failed to rebuild index: {str(e)}") @router.get("/analysis/failures") async def analyze_failures( days: int | None = None, date_from: date | None = Query(None), date_to: date | None = Query(None), printer_id: int | None = None, project_id: int | None = None, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Analyze failure patterns across prints. Returns failure statistics including: - Overall failure rate - Failures by reason, filament type, printer - Time of day distribution - Recent failures - Weekly trend """ from backend.app.services.failure_analysis import FailureAnalysisService service = FailureAnalysisService(db) return await service.analyze_failures( days=days, date_from=date_from, date_to=date_to, printer_id=printer_id, project_id=project_id, ) @router.get("/compare") async def compare_archives( archive_ids: str = Query(..., description="Comma-separated archive IDs (2-5)"), db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Compare multiple archives side by side. Compares print settings, filament usage, and print times. Also analyzes correlation between settings and success/failure. Args: archive_ids: Comma-separated list of 2-5 archive IDs to compare """ from backend.app.services.archive_comparison import ArchiveComparisonService # Parse and validate archive IDs try: ids = [int(id.strip()) for id in archive_ids.split(",")] except ValueError: raise HTTPException(400, "Invalid archive IDs format") if len(ids) < 2: raise HTTPException(400, "At least 2 archives required for comparison") if len(ids) > 5: raise HTTPException(400, "Maximum 5 archives can be compared at once") service = ArchiveComparisonService(db) try: return await service.compare_archives(ids) except ValueError as e: raise HTTPException(400, str(e)) @router.get("/export") async def export_archives( format: str = Query("csv", description="Export format: csv or xlsx"), fields: str | None = Query(None, description="Comma-separated field names"), printer_id: int | None = None, project_id: int | None = None, status: str | None = None, date_from: str | None = Query(None, description="Start date (ISO format)"), date_to: str | None = Query(None, description="End date (ISO format)"), search: str | None = None, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Export archives to CSV or Excel format. Returns a downloadable file with archive data. """ from datetime import datetime from fastapi.responses import StreamingResponse from backend.app.services.export import ExportService if format not in ("csv", "xlsx"): raise HTTPException(400, "Format must be 'csv' or 'xlsx'") # Parse fields field_list = None if fields: field_list = [f.strip() for f in fields.split(",")] # Parse dates date_from_dt = None date_to_dt = None if date_from: try: date_from_dt = datetime.fromisoformat(date_from) except ValueError: raise HTTPException(400, "Invalid date_from format") if date_to: try: date_to_dt = datetime.fromisoformat(date_to) except ValueError: raise HTTPException(400, "Invalid date_to format") service = ExportService(db) try: file_bytes, filename, content_type = await service.export_archives( format=format, fields=field_list, printer_id=printer_id, project_id=project_id, status=status, date_from=date_from_dt, date_to=date_to_dt, search=search, ) except ImportError as e: raise HTTPException(500, str(e)) return StreamingResponse( io.BytesIO(file_bytes), media_type=content_type, headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.get("/stats/export") async def export_stats( format: str = Query("csv", description="Export format: csv or xlsx"), days: int = 30, printer_id: int | None = None, project_id: int | None = None, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.STATS_READ), ): """Export statistics summary to CSV or Excel format.""" from fastapi.responses import StreamingResponse from backend.app.services.export import ExportService if format not in ("csv", "xlsx"): raise HTTPException(400, "Format must be 'csv' or 'xlsx'") service = ExportService(db) try: file_bytes, filename, content_type = await service.export_stats( format=format, days=days, printer_id=printer_id, project_id=project_id, ) except ImportError as e: raise HTTPException(500, str(e)) return StreamingResponse( io.BytesIO(file_bytes), media_type=content_type, headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.get("/stats", response_model=ArchiveStats) async def get_archive_stats( date_from: date | None = Query(None, description="Start date (inclusive), YYYY-MM-DD"), date_to: date | None = Query(None, description="End date (inclusive), YYYY-MM-DD"), db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.STATS_READ), ): """Get statistics across all archives.""" # Build date filter conditions base_conditions = [] if date_from: dt_from = datetime.combine(date_from, time.min, tzinfo=timezone.utc) base_conditions.append(PrintArchive.created_at >= dt_from) if date_to: dt_to = datetime.combine(date_to, time.max, tzinfo=timezone.utc) base_conditions.append(PrintArchive.created_at <= dt_to) # Total counts total_result = await db.execute(select(func.count(PrintArchive.id)).where(*base_conditions)) total_prints = total_result.scalar() or 0 successful_result = await db.execute( select(func.count(PrintArchive.id)).where(PrintArchive.status == "completed", *base_conditions) ) successful_prints = successful_result.scalar() or 0 failed_result = await db.execute( select(func.count(PrintArchive.id)).where(PrintArchive.status == "failed", *base_conditions) ) failed_prints = failed_result.scalar() or 0 # Totals - use actual print time from timestamps (not slicer estimates) # For archives with both started_at and completed_at, calculate actual duration # Fall back to print_time_seconds only for archives without timestamps archives_for_time = await db.execute( select(PrintArchive.started_at, PrintArchive.completed_at, PrintArchive.print_time_seconds).where( *base_conditions ) ) total_seconds = 0 for started_at, completed_at, print_time_seconds in archives_for_time.all(): if started_at and completed_at: # Use actual elapsed time actual_seconds = (completed_at - started_at).total_seconds() if actual_seconds > 0: total_seconds += actual_seconds elif print_time_seconds: # Fallback to estimate only if no timestamps total_seconds += print_time_seconds total_time = total_seconds / 3600 # Convert to hours # Sum filament directly - filament_used_grams already contains the total for the print job filament_result = await db.execute( select(func.coalesce(func.sum(PrintArchive.filament_used_grams), 0)).where(*base_conditions) ) total_filament = filament_result.scalar() or 0 cost_result = await db.execute(select(func.sum(PrintArchive.cost)).where(*base_conditions)) total_cost = cost_result.scalar() or 0 # By filament type (split comma-separated values for multi-material prints) filament_type_result = await db.execute( select(PrintArchive.filament_type).where(PrintArchive.filament_type.isnot(None), *base_conditions) ) prints_by_filament: dict[str, int] = {} for (filament_types,) in filament_type_result.all(): # Split by comma and count each type for ftype in filament_types.split(","): ftype = ftype.strip() if ftype: prints_by_filament[ftype] = prints_by_filament.get(ftype, 0) + 1 # By printer printer_result = await db.execute( select(PrintArchive.printer_id, func.count(PrintArchive.id)) .where(*base_conditions) .group_by(PrintArchive.printer_id) ) prints_by_printer = {str(k): v for k, v in printer_result.all()} # Time accuracy statistics # Get all completed archives with both estimated and actual times accuracy_result = await db.execute( select(PrintArchive) .where(PrintArchive.status == "completed", *base_conditions) .where(PrintArchive.print_time_seconds.isnot(None)) .where(PrintArchive.started_at.isnot(None)) .where(PrintArchive.completed_at.isnot(None)) ) archives_with_times = list(accuracy_result.scalars().all()) average_accuracy = None accuracy_by_printer: dict[str, float] = {} if archives_with_times: accuracies = [] printer_accuracies: dict[str, list[float]] = {} for archive in archives_with_times: acc_data = compute_time_accuracy(archive) if acc_data["time_accuracy"] is not None: accuracies.append(acc_data["time_accuracy"]) # Group by printer printer_key = str(archive.printer_id) if archive.printer_id else "unknown" if printer_key not in printer_accuracies: printer_accuracies[printer_key] = [] printer_accuracies[printer_key].append(acc_data["time_accuracy"]) if accuracies: average_accuracy = round(sum(accuracies) / len(accuracies), 1) # Calculate per-printer averages for printer_key, accs in printer_accuracies.items(): accuracy_by_printer[printer_key] = round(sum(accs) / len(accs), 1) # Energy totals - check which mode to use from backend.app.api.routes.settings import get_setting energy_tracking_mode = await get_setting(db, "energy_tracking_mode") or "total" energy_cost_per_kwh_str = await get_setting(db, "energy_cost_per_kwh") energy_cost_per_kwh = float(energy_cost_per_kwh_str) if energy_cost_per_kwh_str else 0.15 # When date filters are active, smart plug lifetime totals can't be # filtered by date range — fall back to per-print archive data instead. if energy_tracking_mode == "total" and not date_from and not date_to: # Total mode: sum up 'total' counter from all smart plugs (lifetime consumption) from backend.app.models.smart_plug import SmartPlug from backend.app.services.homeassistant import homeassistant_service from backend.app.services.mqtt_relay import mqtt_relay from backend.app.services.tasmota import tasmota_service plugs_result = await db.execute(select(SmartPlug)) plugs = list(plugs_result.scalars().all()) # Configure HA service once (needed for homeassistant-type plugs) ha_url = await get_setting(db, "ha_url") or "" ha_token = await get_setting(db, "ha_token") or "" homeassistant_service.configure(ha_url, ha_token) total_energy_kwh = 0.0 for plug in plugs: if plug.plug_type == "tasmota": energy = await tasmota_service.get_energy(plug) if energy and energy.get("total") is not None: total_energy_kwh += energy["total"] elif plug.plug_type == "homeassistant": energy = await homeassistant_service.get_energy(plug) if energy and energy.get("total") is not None: total_energy_kwh += energy["total"] elif plug.plug_type == "mqtt": # MQTT plugs report "today" energy, not lifetime total mqtt_data = mqtt_relay.smart_plug_service.get_plug_data(plug.id) if mqtt_data and mqtt_data.energy is not None: total_energy_kwh += mqtt_data.energy total_energy_kwh = round(total_energy_kwh, 3) total_energy_cost = round(total_energy_kwh * energy_cost_per_kwh, 3) else: # Print mode: sum up per-print energy from archives energy_kwh_result = await db.execute(select(func.sum(PrintArchive.energy_kwh)).where(*base_conditions)) total_energy_kwh = energy_kwh_result.scalar() or 0 energy_cost_result = await db.execute(select(func.sum(PrintArchive.energy_cost)).where(*base_conditions)) total_energy_cost = energy_cost_result.scalar() or 0 return ArchiveStats( total_prints=total_prints, successful_prints=successful_prints, failed_prints=failed_prints, total_print_time_hours=round(total_time, 1), total_filament_grams=round(total_filament, 1), total_cost=round(total_cost, 2), prints_by_filament_type=prints_by_filament, prints_by_printer=prints_by_printer, average_time_accuracy=average_accuracy, time_accuracy_by_printer=accuracy_by_printer if accuracy_by_printer else None, total_energy_kwh=round(total_energy_kwh, 3), total_energy_cost=round(total_energy_cost, 3), ) @router.get("/tags") async def get_all_tags( db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """List all unique tags with usage counts. Returns a list of tags sorted by count (descending), then by name. """ # Query all archives with non-null tags result = await db.execute(select(PrintArchive.tags).where(PrintArchive.tags.isnot(None))) all_tags_rows = result.all() # Count occurrences of each tag tag_counts: dict[str, int] = {} for (tags_str,) in all_tags_rows: if tags_str: for tag in tags_str.split(","): tag = tag.strip() if tag: tag_counts[tag] = tag_counts.get(tag, 0) + 1 # Convert to list and sort by count (desc), then name (asc) tags_list = [{"name": name, "count": count} for name, count in tag_counts.items()] tags_list.sort(key=lambda x: (-x["count"], x["name"].lower())) return tags_list @router.put("/tags/{tag_name}") async def rename_tag( tag_name: str, request: Request, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL), ): """Rename a tag across all archives. Request body should contain {"new_name": "new tag name"}. Returns the count of affected archives. """ body = await request.json() new_name = body.get("new_name", "").strip() if not new_name: raise HTTPException(400, "new_name is required") if new_name == tag_name: return {"affected": 0} # Find all archives containing the old tag result = await db.execute(select(PrintArchive).where(PrintArchive.tags.isnot(None))) archives = list(result.scalars().all()) affected = 0 for archive in archives: if not archive.tags: continue tags = [t.strip() for t in archive.tags.split(",")] if tag_name in tags: # Replace old tag with new tag new_tags = [new_name if t == tag_name else t for t in tags] # Remove duplicates while preserving order seen = set() unique_tags = [] for t in new_tags: if t not in seen: seen.add(t) unique_tags.append(t) archive.tags = ", ".join(unique_tags) affected += 1 await db.commit() return {"affected": affected} @router.delete("/tags/{tag_name}") async def delete_tag( tag_name: str, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL), ): """Delete a tag from all archives. Returns the count of affected archives. """ # Find all archives containing the tag result = await db.execute(select(PrintArchive).where(PrintArchive.tags.isnot(None))) archives = list(result.scalars().all()) affected = 0 for archive in archives: if not archive.tags: continue tags = [t.strip() for t in archive.tags.split(",")] if tag_name in tags: # Remove the tag new_tags = [t for t in tags if t != tag_name] archive.tags = ", ".join(new_tags) if new_tags else None affected += 1 await db.commit() return {"affected": affected} @router.get("/{archive_id}", response_model=ArchiveResponse) async def get_archive( archive_id: int, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Get a specific archive.""" service = ArchiveService(db) archive = await service.get_archive(archive_id) if not archive: raise HTTPException(404, "Archive not found") # Find duplicates makerworld_id = archive.extra_data.get("makerworld_model_id") if archive.extra_data else None duplicates = await service.find_duplicates( archive_id=archive.id, content_hash=archive.content_hash, print_name=archive.print_name, makerworld_model_id=makerworld_id, ) return archive_to_response(archive, duplicates) @router.get("/{archive_id}/similar") async def find_similar_archives( archive_id: int, limit: int = 10, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Find archives with similar settings for comparison. Returns archives that match by: - Same print name (highest priority) - Same file content hash - Same filament type """ from backend.app.services.archive_comparison import ArchiveComparisonService service = ArchiveComparisonService(db) try: return await service.find_similar_archives(archive_id, limit=limit) except ValueError as e: raise HTTPException(404, str(e)) @router.patch("/{archive_id}", response_model=ArchiveResponse) async def update_archive( archive_id: int, update_data: ArchiveUpdate, db: AsyncSession = Depends(get_db), auth_result: tuple[User | None, bool] = Depends( require_ownership_permission( Permission.ARCHIVES_UPDATE_ALL, Permission.ARCHIVES_UPDATE_OWN, ) ), ): """Update archive metadata (tags, notes, cost, is_favorite, project_id).""" from sqlalchemy.orm import selectinload user, can_modify_all = auth_result result = await db.execute( select(PrintArchive) .options(selectinload(PrintArchive.project), selectinload(PrintArchive.created_by)) .where(PrintArchive.id == archive_id) ) archive = result.scalar_one_or_none() if not archive: raise HTTPException(404, "Archive not found") # Ownership check if not can_modify_all: if archive.created_by_id != user.id: raise HTTPException(403, "You can only update your own archives") for field, value in update_data.model_dump(exclude_unset=True).items(): setattr(archive, field, value) await db.commit() # Re-fetch with relationships loaded after commit result = await db.execute( select(PrintArchive) .options(selectinload(PrintArchive.project), selectinload(PrintArchive.created_by)) .where(PrintArchive.id == archive_id) ) archive = result.scalar_one_or_none() return archive_to_response(archive) @router.post("/{archive_id}/favorite", response_model=ArchiveResponse) async def toggle_favorite( archive_id: int, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_OWN), ): """Toggle favorite status for an archive.""" result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id)) archive = result.scalar_one_or_none() if not archive: raise HTTPException(404, "Archive not found") archive.is_favorite = not archive.is_favorite await db.commit() await db.refresh(archive) return archive @router.post("/{archive_id}/rescan", response_model=ArchiveResponse) async def rescan_archive( archive_id: int, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL), ): """Rescan the 3MF file and update metadata.""" from backend.app.api.routes.settings import get_setting from backend.app.services.archive import ThreeMFParser result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id)) archive = result.scalar_one_or_none() if not archive: raise HTTPException(404, "Archive not found") file_path = settings.base_dir / archive.file_path if not file_path.is_file(): raise HTTPException(404, "Archive file not found") # Parse the 3MF file parser = ThreeMFParser(file_path) metadata = parser.parse() # Update fields from metadata if metadata.get("filament_type"): archive.filament_type = metadata["filament_type"] if metadata.get("filament_color"): archive.filament_color = metadata["filament_color"] if metadata.get("print_time_seconds"): archive.print_time_seconds = metadata["print_time_seconds"] if metadata.get("filament_used_grams"): archive.filament_used_grams = metadata["filament_used_grams"] if metadata.get("layer_height"): archive.layer_height = metadata["layer_height"] if metadata.get("nozzle_diameter"): archive.nozzle_diameter = metadata["nozzle_diameter"] if metadata.get("bed_temperature"): archive.bed_temperature = metadata["bed_temperature"] if metadata.get("nozzle_temperature"): archive.nozzle_temperature = metadata["nozzle_temperature"] if metadata.get("makerworld_url"): archive.makerworld_url = metadata["makerworld_url"] if metadata.get("designer"): archive.designer = metadata["designer"] # Calculate cost: prefer spool-based cost if available, else catalog-based if archive.filament_used_grams and archive.filament_type: usage_result = await db.execute( select(func.sum(SpoolUsageHistory.cost)).where(SpoolUsageHistory.archive_id == archive.id) ) usage_cost = usage_result.scalar() if usage_cost is not None and usage_cost > 0: archive.cost = float(Decimal(str(usage_cost)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) else: primary_type = archive.filament_type.split(",")[0].strip() filament_result = await db.execute(select(Filament).where(Filament.type == primary_type).limit(1)) filament = filament_result.scalar_one_or_none() if filament: archive.cost = float( Decimal(str((archive.filament_used_grams / 1000) * filament.cost_per_kg)).quantize( Decimal("0.01"), rounding=ROUND_HALF_UP ) ) else: # Use default filament cost from settings default_cost_setting = await get_setting(db, "default_filament_cost") default_cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0 archive.cost = float( Decimal(str((archive.filament_used_grams / 1000) * default_cost_per_kg)).quantize( Decimal("0.01"), rounding=ROUND_HALF_UP ) ) await db.commit() await db.refresh(archive) return archive @router.post("/recalculate-costs") async def recalculate_all_costs( db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL), ): """Recalculate costs for all archives based on filament usage and prices.""" from backend.app.api.routes.settings import get_setting result = await db.execute(select(PrintArchive)) archives = list(result.scalars().all()) # Load all filaments for lookup filament_result = await db.execute(select(Filament)) filaments = {f.type: f.cost_per_kg for f in filament_result.scalars().all()} # Get default filament cost from settings default_cost_setting = await get_setting(db, "default_filament_cost") default_cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0 # Pre-fetch all usage costs by archive_id usage_costs_result = await db.execute( select(SpoolUsageHistory.archive_id, func.sum(SpoolUsageHistory.cost)).group_by(SpoolUsageHistory.archive_id) ) usage_costs = usage_costs_result.fetchall() cost_map = {row[0]: row[1] for row in usage_costs if row[0] is not None and row[1] is not None and row[1] > 0} updated = 0 for archive in archives: usage_cost = cost_map.get(archive.id) if usage_cost is not None: new_cost = round(usage_cost, 2) else: # Fallback: sum costs for old records by print_name usage_result = await db.execute( select(func.sum(SpoolUsageHistory.cost)).where( SpoolUsageHistory.print_name == archive.print_name, SpoolUsageHistory.archive_id.is_(None), ) ) fallback_cost = usage_result.scalar() if fallback_cost is not None and fallback_cost > 0: new_cost = round(fallback_cost, 2) elif archive.filament_used_grams and archive.filament_type: primary_type = archive.filament_type.split(",")[0].strip() cost_per_kg = filaments.get(primary_type, default_cost_per_kg) new_cost = round((archive.filament_used_grams / 1000) * cost_per_kg, 2) else: new_cost = None if new_cost is not None and archive.cost != new_cost: archive.cost = new_cost updated += 1 await db.commit() return {"message": f"Recalculated costs for {updated} archives", "updated": updated} @router.post("/rescan-all") async def rescan_all_archives( db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL), ): """Rescan all archives and update their metadata.""" from backend.app.services.archive import ThreeMFParser result = await db.execute(select(PrintArchive)) archives = list(result.scalars().all()) updated = 0 errors = [] for archive in archives: try: file_path = settings.base_dir / archive.file_path if not file_path.is_file(): errors.append({"id": archive.id, "error": "File not found"}) continue parser = ThreeMFParser(file_path) metadata = parser.parse() if metadata.get("filament_type"): archive.filament_type = metadata["filament_type"] if metadata.get("filament_color"): archive.filament_color = metadata["filament_color"] if metadata.get("print_time_seconds"): archive.print_time_seconds = metadata["print_time_seconds"] if metadata.get("filament_used_grams"): archive.filament_used_grams = metadata["filament_used_grams"] if metadata.get("layer_height"): archive.layer_height = metadata["layer_height"] if metadata.get("nozzle_diameter"): archive.nozzle_diameter = metadata["nozzle_diameter"] if metadata.get("makerworld_url"): archive.makerworld_url = metadata["makerworld_url"] if metadata.get("designer"): archive.designer = metadata["designer"] updated += 1 except Exception as e: logger.exception("Failed to rescan archive %s: %s", archive.id, e) errors.append({"id": archive.id, "error": "Failed to parse 3MF file"}) await db.commit() return {"updated": updated, "errors": errors} @router.get("/{archive_id}/duplicates") async def get_archive_duplicates( archive_id: int, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Get duplicates for a specific archive.""" service = ArchiveService(db) archive = await service.get_archive(archive_id) if not archive: raise HTTPException(404, "Archive not found") makerworld_id = archive.extra_data.get("makerworld_model_id") if archive.extra_data else None duplicates = await service.find_duplicates( archive_id=archive.id, content_hash=archive.content_hash, print_name=archive.print_name, makerworld_model_id=makerworld_id, ) return {"duplicates": duplicates, "count": len(duplicates)} @router.post("/backfill-hashes") async def backfill_content_hashes( db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL), ): """Compute and store content hashes for all archives missing them.""" result = await db.execute(select(PrintArchive).where(PrintArchive.content_hash.is_(None))) archives = list(result.scalars().all()) updated = 0 errors = [] for archive in archives: try: file_path = settings.base_dir / archive.file_path if not file_path.is_file(): errors.append({"id": archive.id, "error": "File not found"}) continue archive.content_hash = ArchiveService.compute_file_hash(file_path) updated += 1 except Exception as e: logger.exception("Failed to compute hash for archive %s: %s", archive.id, e) errors.append({"id": archive.id, "error": "Failed to compute hash"}) await db.commit() return {"updated": updated, "errors": errors} @router.delete("/{archive_id}") async def delete_archive( archive_id: int, db: AsyncSession = Depends(get_db), auth_result: tuple[User | None, bool] = Depends( require_ownership_permission( Permission.ARCHIVES_DELETE_ALL, Permission.ARCHIVES_DELETE_OWN, ) ), ): """Delete an archive.""" user, can_modify_all = auth_result # Get archive first to check ownership result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id)) archive = result.scalar_one_or_none() if not archive: raise HTTPException(404, "Archive not found") # Ownership check if not can_modify_all: if archive.created_by_id != user.id: raise HTTPException(403, "You can only delete your own archives") service = ArchiveService(db) if not await service.delete_archive(archive_id): raise HTTPException(404, "Archive not found") return {"status": "deleted"} @router.get("/{archive_id}/download") async def download_archive( archive_id: int, inline: bool = False, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Download the 3MF file.""" service = ArchiveService(db) archive = await service.get_archive(archive_id) if not archive: raise HTTPException(404, "Archive not found") file_path = settings.base_dir / archive.file_path if not file_path.is_file(): raise HTTPException(404, "File not found") # Use inline disposition to let browser/OS handle file association content_disposition = "inline" if inline else "attachment" return FileResponse( path=file_path, filename=archive.filename, media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml", content_disposition_type=content_disposition, ) @router.get("/{archive_id}/file/{filename}") async def download_archive_with_filename( archive_id: int, filename: str, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Download the 3MF file with filename in URL.""" service = ArchiveService(db) archive = await service.get_archive(archive_id) if not archive: raise HTTPException(404, "Archive not found") file_path = settings.base_dir / archive.file_path if not file_path.is_file(): raise HTTPException(404, "File not found") return FileResponse( path=file_path, filename=archive.filename, media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml", ) @router.post("/{archive_id}/slicer-token") async def create_archive_slicer_token( archive_id: int, db: AsyncSession = Depends(get_db), _: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ), ): """Create a short-lived download token for opening files in slicer applications. Slicer protocol handlers (bambustudioopen://, orcaslicer://) cannot send auth headers, so they use this token in the URL path instead. """ from backend.app.core.auth import create_slicer_download_token service = ArchiveService(db) archive = await service.get_archive(archive_id) if not archive: raise HTTPException(404, "Archive not found") token = create_slicer_download_token("archive", archive_id) return {"token": token} @router.get("/{archive_id}/dl/{token}/{filename}") async def download_archive_for_slicer( archive_id: int, token: str, filename: str, db: AsyncSession = Depends(get_db), ): """Download 3MF file using a slicer download token. Token-authenticated (no auth headers needed). The token is short-lived and single-use, created by POST /{archive_id}/slicer-token. Filename is at the end of the URL so slicers can detect the file format. """ from backend.app.core.auth import verify_slicer_download_token if not verify_slicer_download_token(token, "archive", archive_id): raise HTTPException(403, "Invalid or expired download token") service = ArchiveService(db) archive = await service.get_archive(archive_id) if not archive: raise HTTPException(404, "Archive not found") file_path = settings.base_dir / archive.file_path if not file_path.is_file(): raise HTTPException(404, "File not found") return FileResponse( path=file_path, filename=archive.filename, media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml", ) @router.get("/{archive_id}/thumbnail") async def get_thumbnail( archive_id: int, db: AsyncSession = Depends(get_db), ): """Get the thumbnail image. Note: Unauthenticated - loaded via tags which can't send auth headers. """ service = ArchiveService(db) archive = await service.get_archive(archive_id) if not archive or not archive.thumbnail_path: raise HTTPException(404, "Thumbnail not found") thumb_path = settings.base_dir / archive.thumbnail_path if not thumb_path.exists(): raise HTTPException(404, "Thumbnail file not found") # Use file modification time as ETag to bust cache mtime = int(thumb_path.stat().st_mtime) return FileResponse( path=thumb_path, media_type="image/png", headers={ "Cache-Control": "no-cache, must-revalidate", "ETag": f'"{mtime}"', }, ) @router.get("/{archive_id}/timelapse") async def get_timelapse( archive_id: int, db: AsyncSession = Depends(get_db), ): """Get the timelapse video. Note: Unauthenticated - loaded via