import io
import json
import logging
import zipfile
from datetime import date, datetime, time, timezone
from decimal import ROUND_HALF_UP, Decimal
from pathlib import Path
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile
from fastapi.responses import FileResponse, Response
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.core.auth import (
RequirePermissionIfAuthEnabled,
require_ownership_permission,
)
from backend.app.core.config import settings
from backend.app.core.database import get_db
from backend.app.core.permissions import Permission
from backend.app.models.archive import PrintArchive
from backend.app.models.filament import Filament
from backend.app.models.spool_usage_history import SpoolUsageHistory
from backend.app.models.user import User
from backend.app.schemas.archive import ArchiveResponse, ArchiveSlim, ArchiveStats, ArchiveUpdate, ReprintRequest
from backend.app.services.archive import ArchiveService
from backend.app.utils.threemf_tools import extract_nozzle_mapping_from_3mf
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/archives", tags=["archives"])
def compute_time_accuracy(archive: PrintArchive) -> dict:
"""Compute actual print time and accuracy for an archive.
Returns dict with actual_time_seconds and time_accuracy.
time_accuracy = (estimated / actual) * 100
- 100% = perfect estimate
- >100% = print was faster than estimated
- <100% = print took longer than estimated
"""
result = {"actual_time_seconds": None, "time_accuracy": None}
if archive.started_at and archive.completed_at and archive.status == "completed":
actual_seconds = int((archive.completed_at - archive.started_at).total_seconds())
if actual_seconds > 0:
result["actual_time_seconds"] = actual_seconds
if archive.print_time_seconds and archive.print_time_seconds > 0:
# Calculate accuracy as percentage
accuracy = (archive.print_time_seconds / actual_seconds) * 100
# Sanity check: skip unreasonable values (e.g., manually changed status)
# Valid range: 5% to 500% (print took 20x longer to 5x faster than estimated)
if 5 <= accuracy <= 500:
result["time_accuracy"] = round(accuracy, 1)
return result
def archive_to_response(
archive: PrintArchive,
duplicates: list[dict] | None = None,
duplicate_count: int = 0,
) -> dict:
"""Convert archive model to response dict with computed fields."""
data = {
"id": archive.id,
"printer_id": archive.printer_id,
"project_id": archive.project_id,
"project_name": archive.project.name if archive.project else None,
"filename": archive.filename,
"file_path": archive.file_path,
"file_size": archive.file_size,
"content_hash": archive.content_hash,
"thumbnail_path": archive.thumbnail_path,
"timelapse_path": archive.timelapse_path,
"source_3mf_path": archive.source_3mf_path,
"f3d_path": archive.f3d_path,
"duplicates": duplicates,
"duplicate_count": duplicate_count if duplicates is None else len(duplicates),
"print_name": archive.print_name,
"print_time_seconds": archive.print_time_seconds,
"filament_used_grams": archive.filament_used_grams,
"filament_type": archive.filament_type,
"filament_color": archive.filament_color,
"layer_height": archive.layer_height,
"total_layers": archive.total_layers,
"nozzle_diameter": archive.nozzle_diameter,
"bed_temperature": archive.bed_temperature,
"nozzle_temperature": archive.nozzle_temperature,
"sliced_for_model": archive.sliced_for_model,
"status": archive.status,
"started_at": archive.started_at,
"completed_at": archive.completed_at,
"extra_data": archive.extra_data,
"makerworld_url": archive.makerworld_url,
"designer": archive.designer,
"external_url": archive.external_url,
"is_favorite": archive.is_favorite,
"tags": archive.tags,
"notes": archive.notes,
"cost": archive.cost,
"photos": archive.photos,
"failure_reason": archive.failure_reason,
"quantity": archive.quantity,
"energy_kwh": archive.energy_kwh,
"energy_cost": archive.energy_cost,
"created_at": archive.created_at,
# User tracking (Issue #206)
"created_by_id": archive.created_by_id,
"created_by_username": archive.created_by.username if archive.created_by else None,
}
# Add computed time accuracy fields
accuracy_data = compute_time_accuracy(archive)
data.update(accuracy_data)
return data
@router.get("/", response_model=list[ArchiveResponse])
async def list_archives(
printer_id: int | None = None,
project_id: int | None = None,
date_from: date | None = Query(None),
date_to: date | None = Query(None),
limit: int = 50,
offset: int = 0,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""List archived prints."""
service = ArchiveService(db)
archives = await service.list_archives(
printer_id=printer_id,
project_id=project_id,
date_from=date_from,
date_to=date_to,
limit=limit,
offset=offset,
)
# Get sets of hashes and names that have duplicates (efficient single queries)
duplicate_hashes, duplicate_names = await service.get_duplicate_hashes_and_names()
# Mark archives that have duplicates (by hash or by print name)
result = []
for a in archives:
has_hash_dup = a.content_hash in duplicate_hashes if a.content_hash else False
has_name_dup = a.print_name and a.print_name.lower() in duplicate_names
has_duplicate = has_hash_dup or has_name_dup
result.append(archive_to_response(a, duplicate_count=1 if has_duplicate else 0))
return result
@router.get("/slim", response_model=list[ArchiveSlim])
async def list_archives_slim(
date_from: date | None = Query(None),
date_to: date | None = Query(None),
limit: int = Query(default=10000, le=50000),
offset: int = 0,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Lightweight archive listing for stats/dashboard widgets.
Returns only the fields needed for client-side aggregation,
skipping duplicate detection, file paths, and extra_data.
"""
filters = []
if date_from:
dt_from = datetime.combine(date_from, time.min, tzinfo=timezone.utc)
filters.append(PrintArchive.created_at >= dt_from)
if date_to:
dt_to = datetime.combine(date_to, time.max, tzinfo=timezone.utc)
filters.append(PrintArchive.created_at <= dt_to)
query = (
select(
PrintArchive.printer_id,
PrintArchive.print_name,
PrintArchive.print_time_seconds,
PrintArchive.started_at,
PrintArchive.completed_at,
PrintArchive.filament_used_grams,
PrintArchive.filament_type,
PrintArchive.filament_color,
PrintArchive.status,
PrintArchive.cost,
PrintArchive.quantity,
PrintArchive.created_at,
)
.where(*filters)
.order_by(PrintArchive.created_at.desc())
.limit(limit)
.offset(offset)
)
result = await db.execute(query)
rows = result.all()
return [
{
"printer_id": r.printer_id,
"print_name": r.print_name,
"print_time_seconds": r.print_time_seconds,
"actual_time_seconds": (
int((r.completed_at - r.started_at).total_seconds())
if r.started_at
and r.completed_at
and r.status == "completed"
and (r.completed_at - r.started_at).total_seconds() > 0
else None
),
"filament_used_grams": r.filament_used_grams,
"filament_type": r.filament_type,
"filament_color": r.filament_color,
"status": r.status,
"started_at": r.started_at,
"completed_at": r.completed_at,
"cost": r.cost,
"quantity": r.quantity,
"created_at": r.created_at,
}
for r in rows
]
@router.get("/search", response_model=list[ArchiveResponse])
async def search_archives(
q: str = Query(..., min_length=2, description="Search query"),
printer_id: int | None = None,
project_id: int | None = None,
status: str | None = None,
limit: int = 50,
offset: int = 0,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Full-text search across archives.
Searches print_name, filename, tags, notes, designer, and filament_type fields.
Supports partial matches with wildcards (e.g., 'vor*' matches 'voron').
"""
from sqlalchemy import text
from sqlalchemy.orm import selectinload
# Prepare search query - add wildcard for partial matches
search_term = q.strip()
if not search_term.endswith("*"):
search_term = f"{search_term}*"
# Build the FTS query
# Using MATCH for FTS5 full-text search
fts_query = text("""
SELECT rowid FROM archive_fts
WHERE archive_fts MATCH :search_term
ORDER BY rank
LIMIT :limit OFFSET :offset
""")
try:
result = await db.execute(fts_query, {"search_term": search_term, "limit": limit + 100, "offset": 0})
matched_ids = [row[0] for row in result.fetchall()]
except Exception as e:
logger.warning("FTS search failed, falling back to LIKE search: %s", e)
# Fallback to LIKE search if FTS fails
like_pattern = f"%{q}%"
query = (
select(PrintArchive)
.options(selectinload(PrintArchive.project))
.where(
(PrintArchive.print_name.ilike(like_pattern))
| (PrintArchive.filename.ilike(like_pattern))
| (PrintArchive.tags.ilike(like_pattern))
| (PrintArchive.notes.ilike(like_pattern))
| (PrintArchive.designer.ilike(like_pattern))
| (PrintArchive.filament_type.ilike(like_pattern))
)
.order_by(PrintArchive.created_at.desc())
)
if printer_id:
query = query.where(PrintArchive.printer_id == printer_id)
if project_id:
query = query.where(PrintArchive.project_id == project_id)
if status:
query = query.where(PrintArchive.status == status)
query = query.limit(limit).offset(offset)
result = await db.execute(query)
archives = result.scalars().all()
return [archive_to_response(a) for a in archives]
if not matched_ids:
return []
# Fetch full archive records for matched IDs
query = select(PrintArchive).options(selectinload(PrintArchive.project)).where(PrintArchive.id.in_(matched_ids))
# Apply additional filters
if printer_id:
query = query.where(PrintArchive.printer_id == printer_id)
if project_id:
query = query.where(PrintArchive.project_id == project_id)
if status:
query = query.where(PrintArchive.status == status)
result = await db.execute(query)
archives_dict = {a.id: a for a in result.scalars().all()}
# Preserve FTS ranking order and apply pagination
ordered_archives = [archives_dict[id] for id in matched_ids if id in archives_dict]
paginated = ordered_archives[offset : offset + limit]
return [archive_to_response(a) for a in paginated]
@router.post("/search/rebuild-index")
async def rebuild_search_index(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Rebuild the full-text search index from existing archives.
Use this if search results seem incomplete or incorrect.
"""
from sqlalchemy import text
try:
# Clear and rebuild the FTS index
await db.execute(text("DELETE FROM archive_fts"))
# Repopulate from print_archives
await db.execute(
text("""
INSERT INTO archive_fts(rowid, print_name, filename, tags, notes, designer, filament_type)
SELECT id, print_name, filename, tags, notes, designer, filament_type
FROM print_archives
""")
)
await db.commit()
# Count entries
result = await db.execute(text("SELECT COUNT(*) FROM archive_fts"))
count = result.scalar() or 0
return {"message": f"Search index rebuilt with {count} entries"}
except Exception as e:
logger.error("Failed to rebuild search index: %s", e)
raise HTTPException(status_code=500, detail=f"Failed to rebuild index: {str(e)}")
@router.get("/analysis/failures")
async def analyze_failures(
days: int | None = None,
date_from: date | None = Query(None),
date_to: date | None = Query(None),
printer_id: int | None = None,
project_id: int | None = None,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Analyze failure patterns across prints.
Returns failure statistics including:
- Overall failure rate
- Failures by reason, filament type, printer
- Time of day distribution
- Recent failures
- Weekly trend
"""
from backend.app.services.failure_analysis import FailureAnalysisService
service = FailureAnalysisService(db)
return await service.analyze_failures(
days=days,
date_from=date_from,
date_to=date_to,
printer_id=printer_id,
project_id=project_id,
)
@router.get("/compare")
async def compare_archives(
archive_ids: str = Query(..., description="Comma-separated archive IDs (2-5)"),
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Compare multiple archives side by side.
Compares print settings, filament usage, and print times.
Also analyzes correlation between settings and success/failure.
Args:
archive_ids: Comma-separated list of 2-5 archive IDs to compare
"""
from backend.app.services.archive_comparison import ArchiveComparisonService
# Parse and validate archive IDs
try:
ids = [int(id.strip()) for id in archive_ids.split(",")]
except ValueError:
raise HTTPException(400, "Invalid archive IDs format")
if len(ids) < 2:
raise HTTPException(400, "At least 2 archives required for comparison")
if len(ids) > 5:
raise HTTPException(400, "Maximum 5 archives can be compared at once")
service = ArchiveComparisonService(db)
try:
return await service.compare_archives(ids)
except ValueError as e:
raise HTTPException(400, str(e))
@router.get("/export")
async def export_archives(
format: str = Query("csv", description="Export format: csv or xlsx"),
fields: str | None = Query(None, description="Comma-separated field names"),
printer_id: int | None = None,
project_id: int | None = None,
status: str | None = None,
date_from: str | None = Query(None, description="Start date (ISO format)"),
date_to: str | None = Query(None, description="End date (ISO format)"),
search: str | None = None,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Export archives to CSV or Excel format.
Returns a downloadable file with archive data.
"""
from datetime import datetime
from fastapi.responses import StreamingResponse
from backend.app.services.export import ExportService
if format not in ("csv", "xlsx"):
raise HTTPException(400, "Format must be 'csv' or 'xlsx'")
# Parse fields
field_list = None
if fields:
field_list = [f.strip() for f in fields.split(",")]
# Parse dates
date_from_dt = None
date_to_dt = None
if date_from:
try:
date_from_dt = datetime.fromisoformat(date_from)
except ValueError:
raise HTTPException(400, "Invalid date_from format")
if date_to:
try:
date_to_dt = datetime.fromisoformat(date_to)
except ValueError:
raise HTTPException(400, "Invalid date_to format")
service = ExportService(db)
try:
file_bytes, filename, content_type = await service.export_archives(
format=format,
fields=field_list,
printer_id=printer_id,
project_id=project_id,
status=status,
date_from=date_from_dt,
date_to=date_to_dt,
search=search,
)
except ImportError as e:
raise HTTPException(500, str(e))
return StreamingResponse(
io.BytesIO(file_bytes),
media_type=content_type,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/stats/export")
async def export_stats(
format: str = Query("csv", description="Export format: csv or xlsx"),
days: int = 30,
printer_id: int | None = None,
project_id: int | None = None,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.STATS_READ),
):
"""Export statistics summary to CSV or Excel format."""
from fastapi.responses import StreamingResponse
from backend.app.services.export import ExportService
if format not in ("csv", "xlsx"):
raise HTTPException(400, "Format must be 'csv' or 'xlsx'")
service = ExportService(db)
try:
file_bytes, filename, content_type = await service.export_stats(
format=format,
days=days,
printer_id=printer_id,
project_id=project_id,
)
except ImportError as e:
raise HTTPException(500, str(e))
return StreamingResponse(
io.BytesIO(file_bytes),
media_type=content_type,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/stats", response_model=ArchiveStats)
async def get_archive_stats(
date_from: date | None = Query(None, description="Start date (inclusive), YYYY-MM-DD"),
date_to: date | None = Query(None, description="End date (inclusive), YYYY-MM-DD"),
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.STATS_READ),
):
"""Get statistics across all archives."""
# Build date filter conditions
base_conditions = []
if date_from:
dt_from = datetime.combine(date_from, time.min, tzinfo=timezone.utc)
base_conditions.append(PrintArchive.created_at >= dt_from)
if date_to:
dt_to = datetime.combine(date_to, time.max, tzinfo=timezone.utc)
base_conditions.append(PrintArchive.created_at <= dt_to)
# Total counts
total_result = await db.execute(select(func.count(PrintArchive.id)).where(*base_conditions))
total_prints = total_result.scalar() or 0
successful_result = await db.execute(
select(func.count(PrintArchive.id)).where(PrintArchive.status == "completed", *base_conditions)
)
successful_prints = successful_result.scalar() or 0
failed_result = await db.execute(
select(func.count(PrintArchive.id)).where(PrintArchive.status == "failed", *base_conditions)
)
failed_prints = failed_result.scalar() or 0
# Totals - use actual print time from timestamps (not slicer estimates)
# For archives with both started_at and completed_at, calculate actual duration
# Fall back to print_time_seconds only for archives without timestamps
archives_for_time = await db.execute(
select(PrintArchive.started_at, PrintArchive.completed_at, PrintArchive.print_time_seconds).where(
*base_conditions
)
)
total_seconds = 0
for started_at, completed_at, print_time_seconds in archives_for_time.all():
if started_at and completed_at:
# Use actual elapsed time
actual_seconds = (completed_at - started_at).total_seconds()
if actual_seconds > 0:
total_seconds += actual_seconds
elif print_time_seconds:
# Fallback to estimate only if no timestamps
total_seconds += print_time_seconds
total_time = total_seconds / 3600 # Convert to hours
# Sum filament directly - filament_used_grams already contains the total for the print job
filament_result = await db.execute(
select(func.coalesce(func.sum(PrintArchive.filament_used_grams), 0)).where(*base_conditions)
)
total_filament = filament_result.scalar() or 0
cost_result = await db.execute(select(func.sum(PrintArchive.cost)).where(*base_conditions))
total_cost = cost_result.scalar() or 0
# By filament type (split comma-separated values for multi-material prints)
filament_type_result = await db.execute(
select(PrintArchive.filament_type).where(PrintArchive.filament_type.isnot(None), *base_conditions)
)
prints_by_filament: dict[str, int] = {}
for (filament_types,) in filament_type_result.all():
# Split by comma and count each type
for ftype in filament_types.split(","):
ftype = ftype.strip()
if ftype:
prints_by_filament[ftype] = prints_by_filament.get(ftype, 0) + 1
# By printer
printer_result = await db.execute(
select(PrintArchive.printer_id, func.count(PrintArchive.id))
.where(*base_conditions)
.group_by(PrintArchive.printer_id)
)
prints_by_printer = {str(k): v for k, v in printer_result.all()}
# Time accuracy statistics
# Get all completed archives with both estimated and actual times
accuracy_result = await db.execute(
select(PrintArchive)
.where(PrintArchive.status == "completed", *base_conditions)
.where(PrintArchive.print_time_seconds.isnot(None))
.where(PrintArchive.started_at.isnot(None))
.where(PrintArchive.completed_at.isnot(None))
)
archives_with_times = list(accuracy_result.scalars().all())
average_accuracy = None
accuracy_by_printer: dict[str, float] = {}
if archives_with_times:
accuracies = []
printer_accuracies: dict[str, list[float]] = {}
for archive in archives_with_times:
acc_data = compute_time_accuracy(archive)
if acc_data["time_accuracy"] is not None:
accuracies.append(acc_data["time_accuracy"])
# Group by printer
printer_key = str(archive.printer_id) if archive.printer_id else "unknown"
if printer_key not in printer_accuracies:
printer_accuracies[printer_key] = []
printer_accuracies[printer_key].append(acc_data["time_accuracy"])
if accuracies:
average_accuracy = round(sum(accuracies) / len(accuracies), 1)
# Calculate per-printer averages
for printer_key, accs in printer_accuracies.items():
accuracy_by_printer[printer_key] = round(sum(accs) / len(accs), 1)
# Energy totals - check which mode to use
from backend.app.api.routes.settings import get_setting
energy_tracking_mode = await get_setting(db, "energy_tracking_mode") or "total"
energy_cost_per_kwh_str = await get_setting(db, "energy_cost_per_kwh")
energy_cost_per_kwh = float(energy_cost_per_kwh_str) if energy_cost_per_kwh_str else 0.15
if energy_tracking_mode == "total":
# Total mode: sum up 'total' counter from all smart plugs (lifetime consumption)
from backend.app.models.smart_plug import SmartPlug
from backend.app.services.homeassistant import homeassistant_service
from backend.app.services.mqtt_relay import mqtt_relay
from backend.app.services.tasmota import tasmota_service
plugs_result = await db.execute(select(SmartPlug))
plugs = list(plugs_result.scalars().all())
# Configure HA service once (needed for homeassistant-type plugs)
ha_url = await get_setting(db, "ha_url") or ""
ha_token = await get_setting(db, "ha_token") or ""
homeassistant_service.configure(ha_url, ha_token)
total_energy_kwh = 0.0
for plug in plugs:
if plug.plug_type == "tasmota":
energy = await tasmota_service.get_energy(plug)
if energy and energy.get("total") is not None:
total_energy_kwh += energy["total"]
elif plug.plug_type == "homeassistant":
energy = await homeassistant_service.get_energy(plug)
if energy and energy.get("total") is not None:
total_energy_kwh += energy["total"]
elif plug.plug_type == "mqtt":
# MQTT plugs report "today" energy, not lifetime total
mqtt_data = mqtt_relay.smart_plug_service.get_plug_data(plug.id)
if mqtt_data and mqtt_data.energy is not None:
total_energy_kwh += mqtt_data.energy
total_energy_kwh = round(total_energy_kwh, 3)
total_energy_cost = round(total_energy_kwh * energy_cost_per_kwh, 3)
else:
# Print mode: sum up per-print energy from archives
energy_kwh_result = await db.execute(select(func.sum(PrintArchive.energy_kwh)).where(*base_conditions))
total_energy_kwh = energy_kwh_result.scalar() or 0
energy_cost_result = await db.execute(select(func.sum(PrintArchive.energy_cost)).where(*base_conditions))
total_energy_cost = energy_cost_result.scalar() or 0
return ArchiveStats(
total_prints=total_prints,
successful_prints=successful_prints,
failed_prints=failed_prints,
total_print_time_hours=round(total_time, 1),
total_filament_grams=round(total_filament, 1),
total_cost=round(total_cost, 2),
prints_by_filament_type=prints_by_filament,
prints_by_printer=prints_by_printer,
average_time_accuracy=average_accuracy,
time_accuracy_by_printer=accuracy_by_printer if accuracy_by_printer else None,
total_energy_kwh=round(total_energy_kwh, 3),
total_energy_cost=round(total_energy_cost, 3),
)
@router.get("/tags")
async def get_all_tags(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""List all unique tags with usage counts.
Returns a list of tags sorted by count (descending), then by name.
"""
# Query all archives with non-null tags
result = await db.execute(select(PrintArchive.tags).where(PrintArchive.tags.isnot(None)))
all_tags_rows = result.all()
# Count occurrences of each tag
tag_counts: dict[str, int] = {}
for (tags_str,) in all_tags_rows:
if tags_str:
for tag in tags_str.split(","):
tag = tag.strip()
if tag:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
# Convert to list and sort by count (desc), then name (asc)
tags_list = [{"name": name, "count": count} for name, count in tag_counts.items()]
tags_list.sort(key=lambda x: (-x["count"], x["name"].lower()))
return tags_list
@router.put("/tags/{tag_name}")
async def rename_tag(
tag_name: str,
request: Request,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Rename a tag across all archives.
Request body should contain {"new_name": "new tag name"}.
Returns the count of affected archives.
"""
body = await request.json()
new_name = body.get("new_name", "").strip()
if not new_name:
raise HTTPException(400, "new_name is required")
if new_name == tag_name:
return {"affected": 0}
# Find all archives containing the old tag
result = await db.execute(select(PrintArchive).where(PrintArchive.tags.isnot(None)))
archives = list(result.scalars().all())
affected = 0
for archive in archives:
if not archive.tags:
continue
tags = [t.strip() for t in archive.tags.split(",")]
if tag_name in tags:
# Replace old tag with new tag
new_tags = [new_name if t == tag_name else t for t in tags]
# Remove duplicates while preserving order
seen = set()
unique_tags = []
for t in new_tags:
if t not in seen:
seen.add(t)
unique_tags.append(t)
archive.tags = ", ".join(unique_tags)
affected += 1
await db.commit()
return {"affected": affected}
@router.delete("/tags/{tag_name}")
async def delete_tag(
tag_name: str,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Delete a tag from all archives.
Returns the count of affected archives.
"""
# Find all archives containing the tag
result = await db.execute(select(PrintArchive).where(PrintArchive.tags.isnot(None)))
archives = list(result.scalars().all())
affected = 0
for archive in archives:
if not archive.tags:
continue
tags = [t.strip() for t in archive.tags.split(",")]
if tag_name in tags:
# Remove the tag
new_tags = [t for t in tags if t != tag_name]
archive.tags = ", ".join(new_tags) if new_tags else None
affected += 1
await db.commit()
return {"affected": affected}
@router.get("/{archive_id}", response_model=ArchiveResponse)
async def get_archive(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Get a specific archive."""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
# Find duplicates
makerworld_id = archive.extra_data.get("makerworld_model_id") if archive.extra_data else None
duplicates = await service.find_duplicates(
archive_id=archive.id,
content_hash=archive.content_hash,
print_name=archive.print_name,
makerworld_model_id=makerworld_id,
)
return archive_to_response(archive, duplicates)
@router.get("/{archive_id}/similar")
async def find_similar_archives(
archive_id: int,
limit: int = 10,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Find archives with similar settings for comparison.
Returns archives that match by:
- Same print name (highest priority)
- Same file content hash
- Same filament type
"""
from backend.app.services.archive_comparison import ArchiveComparisonService
service = ArchiveComparisonService(db)
try:
return await service.find_similar_archives(archive_id, limit=limit)
except ValueError as e:
raise HTTPException(404, str(e))
@router.patch("/{archive_id}", response_model=ArchiveResponse)
async def update_archive(
archive_id: int,
update_data: ArchiveUpdate,
db: AsyncSession = Depends(get_db),
auth_result: tuple[User | None, bool] = Depends(
require_ownership_permission(
Permission.ARCHIVES_UPDATE_ALL,
Permission.ARCHIVES_UPDATE_OWN,
)
),
):
"""Update archive metadata (tags, notes, cost, is_favorite, project_id)."""
from sqlalchemy.orm import selectinload
user, can_modify_all = auth_result
result = await db.execute(
select(PrintArchive)
.options(selectinload(PrintArchive.project), selectinload(PrintArchive.created_by))
.where(PrintArchive.id == archive_id)
)
archive = result.scalar_one_or_none()
if not archive:
raise HTTPException(404, "Archive not found")
# Ownership check
if not can_modify_all:
if archive.created_by_id != user.id:
raise HTTPException(403, "You can only update your own archives")
for field, value in update_data.model_dump(exclude_unset=True).items():
setattr(archive, field, value)
await db.commit()
# Re-fetch with relationships loaded after commit
result = await db.execute(
select(PrintArchive)
.options(selectinload(PrintArchive.project), selectinload(PrintArchive.created_by))
.where(PrintArchive.id == archive_id)
)
archive = result.scalar_one_or_none()
return archive_to_response(archive)
@router.post("/{archive_id}/favorite", response_model=ArchiveResponse)
async def toggle_favorite(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_OWN),
):
"""Toggle favorite status for an archive."""
result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
archive = result.scalar_one_or_none()
if not archive:
raise HTTPException(404, "Archive not found")
archive.is_favorite = not archive.is_favorite
await db.commit()
await db.refresh(archive)
return archive
@router.post("/{archive_id}/rescan", response_model=ArchiveResponse)
async def rescan_archive(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Rescan the 3MF file and update metadata."""
from backend.app.api.routes.settings import get_setting
from backend.app.services.archive import ThreeMFParser
result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
archive = result.scalar_one_or_none()
if not archive:
raise HTTPException(404, "Archive not found")
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
raise HTTPException(404, "Archive file not found")
# Parse the 3MF file
parser = ThreeMFParser(file_path)
metadata = parser.parse()
# Update fields from metadata
if metadata.get("filament_type"):
archive.filament_type = metadata["filament_type"]
if metadata.get("filament_color"):
archive.filament_color = metadata["filament_color"]
if metadata.get("print_time_seconds"):
archive.print_time_seconds = metadata["print_time_seconds"]
if metadata.get("filament_used_grams"):
archive.filament_used_grams = metadata["filament_used_grams"]
if metadata.get("layer_height"):
archive.layer_height = metadata["layer_height"]
if metadata.get("nozzle_diameter"):
archive.nozzle_diameter = metadata["nozzle_diameter"]
if metadata.get("bed_temperature"):
archive.bed_temperature = metadata["bed_temperature"]
if metadata.get("nozzle_temperature"):
archive.nozzle_temperature = metadata["nozzle_temperature"]
if metadata.get("makerworld_url"):
archive.makerworld_url = metadata["makerworld_url"]
if metadata.get("designer"):
archive.designer = metadata["designer"]
# Calculate cost: prefer spool-based cost if available, else catalog-based
if archive.filament_used_grams and archive.filament_type:
usage_result = await db.execute(
select(func.sum(SpoolUsageHistory.cost)).where(SpoolUsageHistory.archive_id == archive.id)
)
usage_cost = usage_result.scalar()
if usage_cost is not None and usage_cost > 0:
archive.cost = float(Decimal(str(usage_cost)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP))
else:
primary_type = archive.filament_type.split(",")[0].strip()
filament_result = await db.execute(select(Filament).where(Filament.type == primary_type).limit(1))
filament = filament_result.scalar_one_or_none()
if filament:
archive.cost = float(
Decimal(str((archive.filament_used_grams / 1000) * filament.cost_per_kg)).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
)
else:
# Use default filament cost from settings
default_cost_setting = await get_setting(db, "default_filament_cost")
default_cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0
archive.cost = float(
Decimal(str((archive.filament_used_grams / 1000) * default_cost_per_kg)).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
)
await db.commit()
await db.refresh(archive)
return archive
@router.post("/recalculate-costs")
async def recalculate_all_costs(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Recalculate costs for all archives based on filament usage and prices."""
from backend.app.api.routes.settings import get_setting
result = await db.execute(select(PrintArchive))
archives = list(result.scalars().all())
# Load all filaments for lookup
filament_result = await db.execute(select(Filament))
filaments = {f.type: f.cost_per_kg for f in filament_result.scalars().all()}
# Get default filament cost from settings
default_cost_setting = await get_setting(db, "default_filament_cost")
default_cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0
# Pre-fetch all usage costs by archive_id
usage_costs_result = await db.execute(
select(SpoolUsageHistory.archive_id, func.sum(SpoolUsageHistory.cost)).group_by(SpoolUsageHistory.archive_id)
)
usage_costs = usage_costs_result.fetchall()
cost_map = {row[0]: row[1] for row in usage_costs if row[0] is not None and row[1] is not None and row[1] > 0}
updated = 0
for archive in archives:
usage_cost = cost_map.get(archive.id)
if usage_cost is not None:
new_cost = round(usage_cost, 2)
else:
# Fallback: sum costs for old records by print_name
usage_result = await db.execute(
select(func.sum(SpoolUsageHistory.cost)).where(
SpoolUsageHistory.print_name == archive.print_name,
SpoolUsageHistory.archive_id.is_(None),
)
)
fallback_cost = usage_result.scalar()
if fallback_cost is not None and fallback_cost > 0:
new_cost = round(fallback_cost, 2)
elif archive.filament_used_grams and archive.filament_type:
primary_type = archive.filament_type.split(",")[0].strip()
cost_per_kg = filaments.get(primary_type, default_cost_per_kg)
new_cost = round((archive.filament_used_grams / 1000) * cost_per_kg, 2)
else:
new_cost = None
if new_cost is not None and archive.cost != new_cost:
archive.cost = new_cost
updated += 1
await db.commit()
return {"message": f"Recalculated costs for {updated} archives", "updated": updated}
@router.post("/rescan-all")
async def rescan_all_archives(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Rescan all archives and update their metadata."""
from backend.app.services.archive import ThreeMFParser
result = await db.execute(select(PrintArchive))
archives = list(result.scalars().all())
updated = 0
errors = []
for archive in archives:
try:
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
errors.append({"id": archive.id, "error": "File not found"})
continue
parser = ThreeMFParser(file_path)
metadata = parser.parse()
if metadata.get("filament_type"):
archive.filament_type = metadata["filament_type"]
if metadata.get("filament_color"):
archive.filament_color = metadata["filament_color"]
if metadata.get("print_time_seconds"):
archive.print_time_seconds = metadata["print_time_seconds"]
if metadata.get("filament_used_grams"):
archive.filament_used_grams = metadata["filament_used_grams"]
if metadata.get("layer_height"):
archive.layer_height = metadata["layer_height"]
if metadata.get("nozzle_diameter"):
archive.nozzle_diameter = metadata["nozzle_diameter"]
if metadata.get("makerworld_url"):
archive.makerworld_url = metadata["makerworld_url"]
if metadata.get("designer"):
archive.designer = metadata["designer"]
updated += 1
except Exception as e:
logger.exception("Failed to rescan archive %s: %s", archive.id, e)
errors.append({"id": archive.id, "error": "Failed to parse 3MF file"})
await db.commit()
return {"updated": updated, "errors": errors}
@router.get("/{archive_id}/duplicates")
async def get_archive_duplicates(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Get duplicates for a specific archive."""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
makerworld_id = archive.extra_data.get("makerworld_model_id") if archive.extra_data else None
duplicates = await service.find_duplicates(
archive_id=archive.id,
content_hash=archive.content_hash,
print_name=archive.print_name,
makerworld_model_id=makerworld_id,
)
return {"duplicates": duplicates, "count": len(duplicates)}
@router.post("/backfill-hashes")
async def backfill_content_hashes(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Compute and store content hashes for all archives missing them."""
result = await db.execute(select(PrintArchive).where(PrintArchive.content_hash.is_(None)))
archives = list(result.scalars().all())
updated = 0
errors = []
for archive in archives:
try:
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
errors.append({"id": archive.id, "error": "File not found"})
continue
archive.content_hash = ArchiveService.compute_file_hash(file_path)
updated += 1
except Exception as e:
logger.exception("Failed to compute hash for archive %s: %s", archive.id, e)
errors.append({"id": archive.id, "error": "Failed to compute hash"})
await db.commit()
return {"updated": updated, "errors": errors}
@router.delete("/{archive_id}")
async def delete_archive(
archive_id: int,
db: AsyncSession = Depends(get_db),
auth_result: tuple[User | None, bool] = Depends(
require_ownership_permission(
Permission.ARCHIVES_DELETE_ALL,
Permission.ARCHIVES_DELETE_OWN,
)
),
):
"""Delete an archive."""
user, can_modify_all = auth_result
# Get archive first to check ownership
result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
archive = result.scalar_one_or_none()
if not archive:
raise HTTPException(404, "Archive not found")
# Ownership check
if not can_modify_all:
if archive.created_by_id != user.id:
raise HTTPException(403, "You can only delete your own archives")
service = ArchiveService(db)
if not await service.delete_archive(archive_id):
raise HTTPException(404, "Archive not found")
return {"status": "deleted"}
@router.get("/{archive_id}/download")
async def download_archive(
archive_id: int,
inline: bool = False,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Download the 3MF file."""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
raise HTTPException(404, "File not found")
# Use inline disposition to let browser/OS handle file association
content_disposition = "inline" if inline else "attachment"
return FileResponse(
path=file_path,
filename=archive.filename,
media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml",
content_disposition_type=content_disposition,
)
@router.get("/{archive_id}/file/{filename}")
async def download_archive_with_filename(
archive_id: int,
filename: str,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Download the 3MF file with filename in URL."""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
raise HTTPException(404, "File not found")
return FileResponse(
path=file_path,
filename=archive.filename,
media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml",
)
@router.post("/{archive_id}/slicer-token")
async def create_archive_slicer_token(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Create a short-lived download token for opening files in slicer applications.
Slicer protocol handlers (bambustudioopen://, orcaslicer://) cannot send
auth headers, so they use this token in the URL path instead.
"""
from backend.app.core.auth import create_slicer_download_token
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
token = create_slicer_download_token("archive", archive_id)
return {"token": token}
@router.get("/{archive_id}/dl/{token}/{filename}")
async def download_archive_for_slicer(
archive_id: int,
token: str,
filename: str,
db: AsyncSession = Depends(get_db),
):
"""Download 3MF file using a slicer download token.
Token-authenticated (no auth headers needed). The token is short-lived
and single-use, created by POST /{archive_id}/slicer-token.
Filename is at the end of the URL so slicers can detect the file format.
"""
from backend.app.core.auth import verify_slicer_download_token
if not verify_slicer_download_token(token, "archive", archive_id):
raise HTTPException(403, "Invalid or expired download token")
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
raise HTTPException(404, "File not found")
return FileResponse(
path=file_path,
filename=archive.filename,
media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml",
)
@router.get("/{archive_id}/thumbnail")
async def get_thumbnail(
archive_id: int,
db: AsyncSession = Depends(get_db),
):
"""Get the thumbnail image.
Note: Unauthenticated - loaded via
tags which can't send auth headers.
"""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive or not archive.thumbnail_path:
raise HTTPException(404, "Thumbnail not found")
thumb_path = settings.base_dir / archive.thumbnail_path
if not thumb_path.exists():
raise HTTPException(404, "Thumbnail file not found")
# Use file modification time as ETag to bust cache
mtime = int(thumb_path.stat().st_mtime)
return FileResponse(
path=thumb_path,
media_type="image/png",
headers={
"Cache-Control": "no-cache, must-revalidate",
"ETag": f'"{mtime}"',
},
)
@router.get("/{archive_id}/timelapse")
async def get_timelapse(
archive_id: int,
db: AsyncSession = Depends(get_db),
):
"""Get the timelapse video.
Note: Unauthenticated - loaded via