import io
import json
import logging
import zipfile
from collections import defaultdict
from datetime import date, datetime, time, timezone
from decimal import ROUND_HALF_UP, Decimal
from pathlib import Path
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile
from fastapi.responses import FileResponse, Response
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from backend.app.core.auth import (
RequirePermissionIfAuthEnabled,
require_ownership_permission,
)
from backend.app.core.config import settings
from backend.app.core.database import get_db
from backend.app.core.permissions import Permission
from backend.app.models.archive import PrintArchive
from backend.app.models.filament import Filament
from backend.app.models.spool_usage_history import SpoolUsageHistory
from backend.app.models.user import User
from backend.app.schemas.archive import ArchiveResponse, ArchiveSlim, ArchiveStats, ArchiveUpdate, ReprintRequest
from backend.app.services.archive import ArchiveService
from backend.app.utils.threemf_tools import extract_nozzle_mapping_from_3mf
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/archives", tags=["archives"])
def compute_time_accuracy(archive: PrintArchive) -> dict:
"""Compute actual print time and accuracy for an archive.
Returns dict with actual_time_seconds and time_accuracy.
time_accuracy = (estimated / actual) * 100
- 100% = perfect estimate
- >100% = print was faster than estimated
- <100% = print took longer than estimated
"""
result = {"actual_time_seconds": None, "time_accuracy": None}
if archive.started_at and archive.completed_at and archive.status == "completed":
actual_seconds = int((archive.completed_at - archive.started_at).total_seconds())
if actual_seconds > 0:
result["actual_time_seconds"] = actual_seconds
if archive.print_time_seconds and archive.print_time_seconds > 0:
# Calculate accuracy as percentage
accuracy = (archive.print_time_seconds / actual_seconds) * 100
# Sanity check: skip unreasonable values (e.g., manually changed status)
# Valid range: 5% to 500% (print took 20x longer to 5x faster than estimated)
if 5 <= accuracy <= 500:
result["time_accuracy"] = round(accuracy, 1)
return result
def archive_to_response(
archive: PrintArchive,
duplicates: list[dict] | None = None,
duplicate_count: int = 0,
duplicate_sequence: int = 0,
original_archive_id: int | None = None,
) -> dict:
"""Convert archive model to response dict with computed fields."""
data = {
"id": archive.id,
"printer_id": archive.printer_id,
"project_id": archive.project_id,
"project_name": archive.project.name if archive.project else None,
"filename": archive.filename,
"file_path": archive.file_path,
"file_size": archive.file_size,
"content_hash": archive.content_hash,
"thumbnail_path": archive.thumbnail_path,
"timelapse_path": archive.timelapse_path,
"source_3mf_path": archive.source_3mf_path,
"f3d_path": archive.f3d_path,
"duplicates": duplicates,
"duplicate_count": duplicate_count if duplicates is None else len(duplicates),
"duplicate_sequence": duplicate_sequence,
"original_archive_id": original_archive_id,
"print_name": archive.print_name,
"print_time_seconds": archive.print_time_seconds,
"filament_used_grams": archive.filament_used_grams,
"filament_type": archive.filament_type,
"filament_color": archive.filament_color,
"layer_height": archive.layer_height,
"total_layers": archive.total_layers,
"nozzle_diameter": archive.nozzle_diameter,
"bed_temperature": archive.bed_temperature,
"nozzle_temperature": archive.nozzle_temperature,
"sliced_for_model": archive.sliced_for_model,
"status": archive.status,
"started_at": archive.started_at,
"completed_at": archive.completed_at,
"extra_data": archive.extra_data,
"makerworld_url": archive.makerworld_url,
"designer": archive.designer,
"external_url": archive.external_url,
"is_favorite": archive.is_favorite,
"tags": archive.tags,
"notes": archive.notes,
"cost": archive.cost,
"photos": archive.photos,
"failure_reason": archive.failure_reason,
"quantity": archive.quantity,
"energy_kwh": archive.energy_kwh,
"energy_cost": archive.energy_cost,
"created_at": archive.created_at,
# User tracking (Issue #206)
"created_by_id": archive.created_by_id,
"created_by_username": archive.created_by.username if archive.created_by else None,
}
# Add computed time accuracy fields
accuracy_data = compute_time_accuracy(archive)
data.update(accuracy_data)
return data
@router.get("/", response_model=list[ArchiveResponse])
async def list_archives(
printer_id: int | None = None,
project_id: int | None = None,
date_from: date | None = Query(None),
date_to: date | None = Query(None),
limit: int = 50,
offset: int = 0,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""List archived prints."""
service = ArchiveService(db)
archives = await service.list_archives(
printer_id=printer_id,
project_id=project_id,
date_from=date_from,
date_to=date_to,
limit=limit,
offset=offset,
)
# Get sets of duplicate hashes and duplicate (name, hash) pairs (efficient single queries)
duplicate_hashes, duplicate_name_hash_pairs = await service.get_duplicate_hashes_and_names()
# Batch-load duplicate groups once for the current page keys.
duplicate_hashes_in_page = {
a.content_hash for a in archives if a.content_hash and a.content_hash in duplicate_hashes
}
duplicate_name_hash_keys_in_page = {
(a.print_name.lower(), a.content_hash)
for a in archives
if a.print_name and a.content_hash and (a.print_name.lower(), a.content_hash) in duplicate_name_hash_pairs
}
duplicate_meta_by_archive_id: dict[int, tuple[int, int, int]] = {}
if duplicate_hashes_in_page or duplicate_name_hash_keys_in_page:
duplicate_group_conditions = []
if duplicate_hashes_in_page:
duplicate_group_conditions.append(PrintArchive.content_hash.in_(duplicate_hashes_in_page))
if duplicate_name_hash_keys_in_page:
name_hash_conditions = [
and_(func.lower(PrintArchive.print_name) == name, PrintArchive.content_hash == hash_)
for name, hash_ in duplicate_name_hash_keys_in_page
]
duplicate_group_conditions.extend(name_hash_conditions)
duplicate_group_rows = await db.execute(
select(
PrintArchive.id,
PrintArchive.created_at,
PrintArchive.content_hash,
func.lower(PrintArchive.print_name).label("print_name_lower"),
).where(or_(*duplicate_group_conditions))
)
duplicate_groups_by_hash: dict[str, list[tuple[int, datetime]]] = defaultdict(list)
duplicate_groups_by_name_hash: dict[tuple[str, str], list[tuple[int, datetime]]] = defaultdict(list)
for archive_id, created_at, content_hash, print_name_lower in duplicate_group_rows.all():
if content_hash and content_hash in duplicate_hashes_in_page:
duplicate_groups_by_hash[content_hash].append((archive_id, created_at))
if (
print_name_lower
and content_hash
and (print_name_lower, content_hash) in duplicate_name_hash_keys_in_page
):
duplicate_groups_by_name_hash[(print_name_lower, content_hash)].append((archive_id, created_at))
for group in duplicate_groups_by_hash.values():
if len(group) < 2:
continue
group.sort(key=lambda x: x[1])
original_id = group[0][0]
duplicate_count = len(group) - 1
for sequence, (archive_id, _) in enumerate(group):
duplicate_meta_by_archive_id[archive_id] = (sequence, original_id, duplicate_count)
# Keep hash-based grouping precedence; name/hash groups only fill missing items.
for group in duplicate_groups_by_name_hash.values():
if len(group) < 2:
continue
group.sort(key=lambda x: x[1])
original_id = group[0][0]
duplicate_count = len(group) - 1
for sequence, (archive_id, _) in enumerate(group):
duplicate_meta_by_archive_id.setdefault(archive_id, (sequence, original_id, duplicate_count))
# Build response with duplicate sequence and original archive ID pre-computed
result = []
for a in archives:
has_hash_dup = a.content_hash in duplicate_hashes if a.content_hash else False
has_name_dup = (
bool(a.print_name and a.content_hash)
and (a.print_name.lower(), a.content_hash) in duplicate_name_hash_pairs
)
has_duplicate = has_hash_dup or has_name_dup
# Pre-compute duplicate sequence and original archive ID
duplicate_sequence = 0
original_archive_id: int | None = None
duplicate_count = 1 if has_duplicate else 0
if has_duplicate and a.id in duplicate_meta_by_archive_id:
duplicate_sequence, original_archive_id, duplicate_count = duplicate_meta_by_archive_id[a.id]
result.append(
archive_to_response(
a,
duplicate_count=duplicate_count,
duplicate_sequence=duplicate_sequence,
original_archive_id=original_archive_id,
)
)
return result
@router.get("/slim", response_model=list[ArchiveSlim])
async def list_archives_slim(
date_from: date | None = Query(None),
date_to: date | None = Query(None),
limit: int = Query(default=10000, le=50000),
offset: int = 0,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Lightweight archive listing for stats/dashboard widgets.
Returns only the fields needed for client-side aggregation,
skipping duplicate detection, file paths, and extra_data.
"""
filters = []
if date_from:
dt_from = datetime.combine(date_from, time.min, tzinfo=timezone.utc)
filters.append(PrintArchive.created_at >= dt_from)
if date_to:
dt_to = datetime.combine(date_to, time.max, tzinfo=timezone.utc)
filters.append(PrintArchive.created_at <= dt_to)
query = (
select(
PrintArchive.printer_id,
PrintArchive.print_name,
PrintArchive.print_time_seconds,
PrintArchive.started_at,
PrintArchive.completed_at,
PrintArchive.filament_used_grams,
PrintArchive.filament_type,
PrintArchive.filament_color,
PrintArchive.status,
PrintArchive.cost,
PrintArchive.quantity,
PrintArchive.created_at,
)
.where(*filters)
.order_by(PrintArchive.created_at.desc())
.limit(limit)
.offset(offset)
)
result = await db.execute(query)
rows = result.all()
return [
{
"printer_id": r.printer_id,
"print_name": r.print_name,
"print_time_seconds": r.print_time_seconds,
"actual_time_seconds": (
int((r.completed_at - r.started_at).total_seconds())
if r.started_at
and r.completed_at
and r.status == "completed"
and (r.completed_at - r.started_at).total_seconds() > 0
else None
),
"filament_used_grams": r.filament_used_grams,
"filament_type": r.filament_type,
"filament_color": r.filament_color,
"status": r.status,
"started_at": r.started_at,
"completed_at": r.completed_at,
"cost": r.cost,
"quantity": r.quantity,
"created_at": r.created_at,
}
for r in rows
]
@router.get("/search", response_model=list[ArchiveResponse])
async def search_archives(
q: str = Query(..., min_length=2, description="Search query"),
printer_id: int | None = None,
project_id: int | None = None,
status: str | None = None,
limit: int = 50,
offset: int = 0,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Full-text search across archives.
Searches print_name, filename, tags, notes, designer, and filament_type fields.
Supports partial matches with wildcards (e.g., 'vor*' matches 'voron').
"""
from sqlalchemy import text
from sqlalchemy.orm import selectinload
# Prepare search query - add wildcard for partial matches
search_term = q.strip()
if not search_term.endswith("*"):
search_term = f"{search_term}*"
# Build the FTS query
# Using MATCH for FTS5 full-text search
fts_query = text("""
SELECT rowid FROM archive_fts
WHERE archive_fts MATCH :search_term
ORDER BY rank
LIMIT :limit OFFSET :offset
""")
try:
result = await db.execute(fts_query, {"search_term": search_term, "limit": limit + 100, "offset": 0})
matched_ids = [row[0] for row in result.fetchall()]
except Exception as e:
logger.warning("FTS search failed, falling back to LIKE search: %s", e)
# Fallback to LIKE search if FTS fails
like_pattern = f"%{q}%"
query = (
select(PrintArchive)
.options(selectinload(PrintArchive.project))
.where(
(PrintArchive.print_name.ilike(like_pattern))
| (PrintArchive.filename.ilike(like_pattern))
| (PrintArchive.tags.ilike(like_pattern))
| (PrintArchive.notes.ilike(like_pattern))
| (PrintArchive.designer.ilike(like_pattern))
| (PrintArchive.filament_type.ilike(like_pattern))
)
.order_by(PrintArchive.created_at.desc())
)
if printer_id:
query = query.where(PrintArchive.printer_id == printer_id)
if project_id:
query = query.where(PrintArchive.project_id == project_id)
if status:
query = query.where(PrintArchive.status == status)
query = query.limit(limit).offset(offset)
result = await db.execute(query)
archives = result.scalars().all()
return [archive_to_response(a) for a in archives]
if not matched_ids:
return []
# Fetch full archive records for matched IDs
query = select(PrintArchive).options(selectinload(PrintArchive.project)).where(PrintArchive.id.in_(matched_ids))
# Apply additional filters
if printer_id:
query = query.where(PrintArchive.printer_id == printer_id)
if project_id:
query = query.where(PrintArchive.project_id == project_id)
if status:
query = query.where(PrintArchive.status == status)
result = await db.execute(query)
archives_dict = {a.id: a for a in result.scalars().all()}
# Preserve FTS ranking order and apply pagination
ordered_archives = [archives_dict[id] for id in matched_ids if id in archives_dict]
paginated = ordered_archives[offset : offset + limit]
return [archive_to_response(a) for a in paginated]
@router.post("/search/rebuild-index")
async def rebuild_search_index(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Rebuild the full-text search index from existing archives.
Use this if search results seem incomplete or incorrect.
"""
from sqlalchemy import text
try:
# Clear and rebuild the FTS index
await db.execute(text("DELETE FROM archive_fts"))
# Repopulate from print_archives
await db.execute(
text("""
INSERT INTO archive_fts(rowid, print_name, filename, tags, notes, designer, filament_type)
SELECT id, print_name, filename, tags, notes, designer, filament_type
FROM print_archives
""")
)
await db.commit()
# Count entries
result = await db.execute(text("SELECT COUNT(*) FROM archive_fts"))
count = result.scalar() or 0
return {"message": f"Search index rebuilt with {count} entries"}
except Exception as e:
logger.error("Failed to rebuild search index: %s", e)
raise HTTPException(status_code=500, detail=f"Failed to rebuild index: {str(e)}")
@router.get("/analysis/failures")
async def analyze_failures(
days: int | None = None,
date_from: date | None = Query(None),
date_to: date | None = Query(None),
printer_id: int | None = None,
project_id: int | None = None,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Analyze failure patterns across prints.
Returns failure statistics including:
- Overall failure rate
- Failures by reason, filament type, printer
- Time of day distribution
- Recent failures
- Weekly trend
"""
from backend.app.services.failure_analysis import FailureAnalysisService
service = FailureAnalysisService(db)
return await service.analyze_failures(
days=days,
date_from=date_from,
date_to=date_to,
printer_id=printer_id,
project_id=project_id,
)
@router.get("/compare")
async def compare_archives(
archive_ids: str = Query(..., description="Comma-separated archive IDs (2-5)"),
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Compare multiple archives side by side.
Compares print settings, filament usage, and print times.
Also analyzes correlation between settings and success/failure.
Args:
archive_ids: Comma-separated list of 2-5 archive IDs to compare
"""
from backend.app.services.archive_comparison import ArchiveComparisonService
# Parse and validate archive IDs
try:
ids = [int(id.strip()) for id in archive_ids.split(",")]
except ValueError:
raise HTTPException(400, "Invalid archive IDs format")
if len(ids) < 2:
raise HTTPException(400, "At least 2 archives required for comparison")
if len(ids) > 5:
raise HTTPException(400, "Maximum 5 archives can be compared at once")
service = ArchiveComparisonService(db)
try:
return await service.compare_archives(ids)
except ValueError as e:
raise HTTPException(400, str(e))
@router.get("/export")
async def export_archives(
format: str = Query("csv", description="Export format: csv or xlsx"),
fields: str | None = Query(None, description="Comma-separated field names"),
printer_id: int | None = None,
project_id: int | None = None,
status: str | None = None,
date_from: str | None = Query(None, description="Start date (ISO format)"),
date_to: str | None = Query(None, description="End date (ISO format)"),
search: str | None = None,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Export archives to CSV or Excel format.
Returns a downloadable file with archive data.
"""
from datetime import datetime
from fastapi.responses import StreamingResponse
from backend.app.services.export import ExportService
if format not in ("csv", "xlsx"):
raise HTTPException(400, "Format must be 'csv' or 'xlsx'")
# Parse fields
field_list = None
if fields:
field_list = [f.strip() for f in fields.split(",")]
# Parse dates
date_from_dt = None
date_to_dt = None
if date_from:
try:
date_from_dt = datetime.fromisoformat(date_from)
except ValueError:
raise HTTPException(400, "Invalid date_from format")
if date_to:
try:
date_to_dt = datetime.fromisoformat(date_to)
except ValueError:
raise HTTPException(400, "Invalid date_to format")
service = ExportService(db)
try:
file_bytes, filename, content_type = await service.export_archives(
format=format,
fields=field_list,
printer_id=printer_id,
project_id=project_id,
status=status,
date_from=date_from_dt,
date_to=date_to_dt,
search=search,
)
except ImportError as e:
raise HTTPException(500, str(e))
return StreamingResponse(
io.BytesIO(file_bytes),
media_type=content_type,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/stats/export")
async def export_stats(
format: str = Query("csv", description="Export format: csv or xlsx"),
days: int = 30,
printer_id: int | None = None,
project_id: int | None = None,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.STATS_READ),
):
"""Export statistics summary to CSV or Excel format."""
from fastapi.responses import StreamingResponse
from backend.app.services.export import ExportService
if format not in ("csv", "xlsx"):
raise HTTPException(400, "Format must be 'csv' or 'xlsx'")
service = ExportService(db)
try:
file_bytes, filename, content_type = await service.export_stats(
format=format,
days=days,
printer_id=printer_id,
project_id=project_id,
)
except ImportError as e:
raise HTTPException(500, str(e))
return StreamingResponse(
io.BytesIO(file_bytes),
media_type=content_type,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/stats", response_model=ArchiveStats)
async def get_archive_stats(
date_from: date | None = Query(None, description="Start date (inclusive), YYYY-MM-DD"),
date_to: date | None = Query(None, description="End date (inclusive), YYYY-MM-DD"),
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.STATS_READ),
):
"""Get statistics across all archives."""
# Build date filter conditions
base_conditions = []
if date_from:
dt_from = datetime.combine(date_from, time.min, tzinfo=timezone.utc)
base_conditions.append(PrintArchive.created_at >= dt_from)
if date_to:
dt_to = datetime.combine(date_to, time.max, tzinfo=timezone.utc)
base_conditions.append(PrintArchive.created_at <= dt_to)
# Total counts
total_result = await db.execute(select(func.count(PrintArchive.id)).where(*base_conditions))
total_prints = total_result.scalar() or 0
successful_result = await db.execute(
select(func.count(PrintArchive.id)).where(PrintArchive.status == "completed", *base_conditions)
)
successful_prints = successful_result.scalar() or 0
failed_result = await db.execute(
select(func.count(PrintArchive.id)).where(PrintArchive.status == "failed", *base_conditions)
)
failed_prints = failed_result.scalar() or 0
# Totals - use actual print time from timestamps (not slicer estimates)
# For archives with both started_at and completed_at, calculate actual duration
# Fall back to print_time_seconds only for archives without timestamps
archives_for_time = await db.execute(
select(PrintArchive.started_at, PrintArchive.completed_at, PrintArchive.print_time_seconds).where(
*base_conditions
)
)
total_seconds = 0
for started_at, completed_at, print_time_seconds in archives_for_time.all():
if started_at and completed_at:
# Use actual elapsed time
actual_seconds = (completed_at - started_at).total_seconds()
if actual_seconds > 0:
total_seconds += actual_seconds
elif print_time_seconds:
# Fallback to estimate only if no timestamps
total_seconds += print_time_seconds
total_time = total_seconds / 3600 # Convert to hours
# Sum filament directly - filament_used_grams already contains the total for the print job
filament_result = await db.execute(
select(func.coalesce(func.sum(PrintArchive.filament_used_grams), 0)).where(*base_conditions)
)
total_filament = filament_result.scalar() or 0
cost_result = await db.execute(select(func.sum(PrintArchive.cost)).where(*base_conditions))
total_cost = cost_result.scalar() or 0
# By filament type (split comma-separated values for multi-material prints)
filament_type_result = await db.execute(
select(PrintArchive.filament_type).where(PrintArchive.filament_type.isnot(None), *base_conditions)
)
prints_by_filament: dict[str, int] = {}
for (filament_types,) in filament_type_result.all():
# Split by comma and count each type
for ftype in filament_types.split(","):
ftype = ftype.strip()
if ftype:
prints_by_filament[ftype] = prints_by_filament.get(ftype, 0) + 1
# By printer
printer_result = await db.execute(
select(PrintArchive.printer_id, func.count(PrintArchive.id))
.where(*base_conditions)
.group_by(PrintArchive.printer_id)
)
prints_by_printer = {str(k): v for k, v in printer_result.all()}
# Time accuracy statistics
# Get all completed archives with both estimated and actual times
accuracy_result = await db.execute(
select(PrintArchive)
.where(PrintArchive.status == "completed", *base_conditions)
.where(PrintArchive.print_time_seconds.isnot(None))
.where(PrintArchive.started_at.isnot(None))
.where(PrintArchive.completed_at.isnot(None))
)
archives_with_times = list(accuracy_result.scalars().all())
average_accuracy = None
accuracy_by_printer: dict[str, float] = {}
if archives_with_times:
accuracies = []
printer_accuracies: dict[str, list[float]] = {}
for archive in archives_with_times:
acc_data = compute_time_accuracy(archive)
if acc_data["time_accuracy"] is not None:
accuracies.append(acc_data["time_accuracy"])
# Group by printer
printer_key = str(archive.printer_id) if archive.printer_id else "unknown"
if printer_key not in printer_accuracies:
printer_accuracies[printer_key] = []
printer_accuracies[printer_key].append(acc_data["time_accuracy"])
if accuracies:
average_accuracy = round(sum(accuracies) / len(accuracies), 1)
# Calculate per-printer averages
for printer_key, accs in printer_accuracies.items():
accuracy_by_printer[printer_key] = round(sum(accs) / len(accs), 1)
# Energy totals - check which mode to use
from backend.app.api.routes.settings import get_setting
energy_tracking_mode = await get_setting(db, "energy_tracking_mode") or "total"
energy_cost_per_kwh_str = await get_setting(db, "energy_cost_per_kwh")
energy_cost_per_kwh = float(energy_cost_per_kwh_str) if energy_cost_per_kwh_str else 0.15
# When date filters are active, smart plug lifetime totals can't be
# filtered by date range — fall back to per-print archive data instead.
if energy_tracking_mode == "total" and not date_from and not date_to:
# Total mode: sum up 'total' counter from all smart plugs (lifetime consumption)
from backend.app.models.smart_plug import SmartPlug
from backend.app.services.homeassistant import homeassistant_service
from backend.app.services.mqtt_relay import mqtt_relay
from backend.app.services.tasmota import tasmota_service
plugs_result = await db.execute(select(SmartPlug))
plugs = list(plugs_result.scalars().all())
# Configure HA service once (needed for homeassistant-type plugs)
ha_url = await get_setting(db, "ha_url") or ""
ha_token = await get_setting(db, "ha_token") or ""
homeassistant_service.configure(ha_url, ha_token)
total_energy_kwh = 0.0
for plug in plugs:
if plug.plug_type == "tasmota":
energy = await tasmota_service.get_energy(plug)
if energy and energy.get("total") is not None:
total_energy_kwh += energy["total"]
elif plug.plug_type == "homeassistant":
energy = await homeassistant_service.get_energy(plug)
if energy and energy.get("total") is not None:
total_energy_kwh += energy["total"]
elif plug.plug_type == "mqtt":
# MQTT plugs report "today" energy, not lifetime total
mqtt_data = mqtt_relay.smart_plug_service.get_plug_data(plug.id)
if mqtt_data and mqtt_data.energy is not None:
total_energy_kwh += mqtt_data.energy
total_energy_kwh = round(total_energy_kwh, 3)
total_energy_cost = round(total_energy_kwh * energy_cost_per_kwh, 3)
else:
# Print mode: sum up per-print energy from archives
energy_kwh_result = await db.execute(select(func.sum(PrintArchive.energy_kwh)).where(*base_conditions))
total_energy_kwh = energy_kwh_result.scalar() or 0
energy_cost_result = await db.execute(select(func.sum(PrintArchive.energy_cost)).where(*base_conditions))
total_energy_cost = energy_cost_result.scalar() or 0
return ArchiveStats(
total_prints=total_prints,
successful_prints=successful_prints,
failed_prints=failed_prints,
total_print_time_hours=round(total_time, 1),
total_filament_grams=round(total_filament, 1),
total_cost=round(total_cost, 2),
prints_by_filament_type=prints_by_filament,
prints_by_printer=prints_by_printer,
average_time_accuracy=average_accuracy,
time_accuracy_by_printer=accuracy_by_printer if accuracy_by_printer else None,
total_energy_kwh=round(total_energy_kwh, 3),
total_energy_cost=round(total_energy_cost, 3),
)
@router.get("/tags")
async def get_all_tags(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""List all unique tags with usage counts.
Returns a list of tags sorted by count (descending), then by name.
"""
# Query all archives with non-null tags
result = await db.execute(select(PrintArchive.tags).where(PrintArchive.tags.isnot(None)))
all_tags_rows = result.all()
# Count occurrences of each tag
tag_counts: dict[str, int] = {}
for (tags_str,) in all_tags_rows:
if tags_str:
for tag in tags_str.split(","):
tag = tag.strip()
if tag:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
# Convert to list and sort by count (desc), then name (asc)
tags_list = [{"name": name, "count": count} for name, count in tag_counts.items()]
tags_list.sort(key=lambda x: (-x["count"], x["name"].lower()))
return tags_list
@router.put("/tags/{tag_name}")
async def rename_tag(
tag_name: str,
request: Request,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Rename a tag across all archives.
Request body should contain {"new_name": "new tag name"}.
Returns the count of affected archives.
"""
body = await request.json()
new_name = body.get("new_name", "").strip()
if not new_name:
raise HTTPException(400, "new_name is required")
if new_name == tag_name:
return {"affected": 0}
# Find all archives containing the old tag
result = await db.execute(select(PrintArchive).where(PrintArchive.tags.isnot(None)))
archives = list(result.scalars().all())
affected = 0
for archive in archives:
if not archive.tags:
continue
tags = [t.strip() for t in archive.tags.split(",")]
if tag_name in tags:
# Replace old tag with new tag
new_tags = [new_name if t == tag_name else t for t in tags]
# Remove duplicates while preserving order
seen = set()
unique_tags = []
for t in new_tags:
if t not in seen:
seen.add(t)
unique_tags.append(t)
archive.tags = ", ".join(unique_tags)
affected += 1
await db.commit()
return {"affected": affected}
@router.delete("/tags/{tag_name}")
async def delete_tag(
tag_name: str,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Delete a tag from all archives.
Returns the count of affected archives.
"""
# Find all archives containing the tag
result = await db.execute(select(PrintArchive).where(PrintArchive.tags.isnot(None)))
archives = list(result.scalars().all())
affected = 0
for archive in archives:
if not archive.tags:
continue
tags = [t.strip() for t in archive.tags.split(",")]
if tag_name in tags:
# Remove the tag
new_tags = [t for t in tags if t != tag_name]
archive.tags = ", ".join(new_tags) if new_tags else None
affected += 1
await db.commit()
return {"affected": affected}
@router.get("/{archive_id}", response_model=ArchiveResponse)
async def get_archive(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Get a specific archive."""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
# Find duplicates
makerworld_id = archive.extra_data.get("makerworld_model_id") if archive.extra_data else None
duplicates = await service.find_duplicates(
archive_id=archive.id,
content_hash=archive.content_hash,
print_name=archive.print_name,
makerworld_model_id=makerworld_id,
)
return archive_to_response(archive, duplicates)
@router.get("/{archive_id}/similar")
async def find_similar_archives(
archive_id: int,
limit: int = 10,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Find archives with similar settings for comparison.
Returns archives that match by:
- Same print name (highest priority)
- Same file content hash
- Same filament type
"""
from backend.app.services.archive_comparison import ArchiveComparisonService
service = ArchiveComparisonService(db)
try:
return await service.find_similar_archives(archive_id, limit=limit)
except ValueError as e:
raise HTTPException(404, str(e))
@router.patch("/{archive_id}", response_model=ArchiveResponse)
async def update_archive(
archive_id: int,
update_data: ArchiveUpdate,
db: AsyncSession = Depends(get_db),
auth_result: tuple[User | None, bool] = Depends(
require_ownership_permission(
Permission.ARCHIVES_UPDATE_ALL,
Permission.ARCHIVES_UPDATE_OWN,
)
),
):
"""Update archive metadata (tags, notes, cost, is_favorite, project_id)."""
from sqlalchemy.orm import selectinload
user, can_modify_all = auth_result
result = await db.execute(
select(PrintArchive)
.options(selectinload(PrintArchive.project), selectinload(PrintArchive.created_by))
.where(PrintArchive.id == archive_id)
)
archive = result.scalar_one_or_none()
if not archive:
raise HTTPException(404, "Archive not found")
# Ownership check
if not can_modify_all:
if archive.created_by_id != user.id:
raise HTTPException(403, "You can only update your own archives")
for field, value in update_data.model_dump(exclude_unset=True).items():
setattr(archive, field, value)
await db.commit()
# Re-fetch with relationships loaded after commit
result = await db.execute(
select(PrintArchive)
.options(selectinload(PrintArchive.project), selectinload(PrintArchive.created_by))
.where(PrintArchive.id == archive_id)
)
archive = result.scalar_one_or_none()
return archive_to_response(archive)
@router.post("/{archive_id}/favorite", response_model=ArchiveResponse)
async def toggle_favorite(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_OWN),
):
"""Toggle favorite status for an archive."""
result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
archive = result.scalar_one_or_none()
if not archive:
raise HTTPException(404, "Archive not found")
archive.is_favorite = not archive.is_favorite
await db.commit()
await db.refresh(archive)
return archive
@router.post("/{archive_id}/rescan", response_model=ArchiveResponse)
async def rescan_archive(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Rescan the 3MF file and update metadata."""
from backend.app.api.routes.settings import get_setting
from backend.app.services.archive import ThreeMFParser
result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
archive = result.scalar_one_or_none()
if not archive:
raise HTTPException(404, "Archive not found")
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
raise HTTPException(404, "Archive file not found")
# Parse the 3MF file
parser = ThreeMFParser(file_path)
metadata = parser.parse()
# Update fields from metadata
if metadata.get("filament_type"):
archive.filament_type = metadata["filament_type"]
if metadata.get("filament_color"):
archive.filament_color = metadata["filament_color"]
if metadata.get("print_time_seconds"):
archive.print_time_seconds = metadata["print_time_seconds"]
if metadata.get("filament_used_grams"):
archive.filament_used_grams = metadata["filament_used_grams"]
if metadata.get("layer_height"):
archive.layer_height = metadata["layer_height"]
if metadata.get("nozzle_diameter"):
archive.nozzle_diameter = metadata["nozzle_diameter"]
if metadata.get("bed_temperature"):
archive.bed_temperature = metadata["bed_temperature"]
if metadata.get("nozzle_temperature"):
archive.nozzle_temperature = metadata["nozzle_temperature"]
if metadata.get("makerworld_url"):
archive.makerworld_url = metadata["makerworld_url"]
if metadata.get("designer"):
archive.designer = metadata["designer"]
# Calculate cost: prefer spool-based cost if available, else catalog-based
if archive.filament_used_grams and archive.filament_type:
usage_result = await db.execute(
select(func.sum(SpoolUsageHistory.cost)).where(SpoolUsageHistory.archive_id == archive.id)
)
usage_cost = usage_result.scalar()
if usage_cost is not None and usage_cost > 0:
archive.cost = float(Decimal(str(usage_cost)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP))
else:
primary_type = archive.filament_type.split(",")[0].strip()
filament_result = await db.execute(select(Filament).where(Filament.type == primary_type).limit(1))
filament = filament_result.scalar_one_or_none()
if filament:
archive.cost = float(
Decimal(str((archive.filament_used_grams / 1000) * filament.cost_per_kg)).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
)
else:
# Use default filament cost from settings
default_cost_setting = await get_setting(db, "default_filament_cost")
default_cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0
archive.cost = float(
Decimal(str((archive.filament_used_grams / 1000) * default_cost_per_kg)).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
)
await db.commit()
await db.refresh(archive)
return archive
@router.post("/recalculate-costs")
async def recalculate_all_costs(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Recalculate costs for all archives based on filament usage and prices."""
from backend.app.api.routes.settings import get_setting
result = await db.execute(select(PrintArchive))
archives = list(result.scalars().all())
# Load all filaments for lookup
filament_result = await db.execute(select(Filament))
filaments = {f.type: f.cost_per_kg for f in filament_result.scalars().all()}
# Get default filament cost from settings
default_cost_setting = await get_setting(db, "default_filament_cost")
default_cost_per_kg = float(default_cost_setting) if default_cost_setting else 25.0
# Pre-fetch all usage costs by archive_id
usage_costs_result = await db.execute(
select(SpoolUsageHistory.archive_id, func.sum(SpoolUsageHistory.cost)).group_by(SpoolUsageHistory.archive_id)
)
usage_costs = usage_costs_result.fetchall()
cost_map = {row[0]: row[1] for row in usage_costs if row[0] is not None and row[1] is not None and row[1] > 0}
updated = 0
for archive in archives:
usage_cost = cost_map.get(archive.id)
if usage_cost is not None:
new_cost = round(usage_cost, 2)
else:
# Fallback: sum costs for old records by print_name
usage_result = await db.execute(
select(func.sum(SpoolUsageHistory.cost)).where(
SpoolUsageHistory.print_name == archive.print_name,
SpoolUsageHistory.archive_id.is_(None),
)
)
fallback_cost = usage_result.scalar()
if fallback_cost is not None and fallback_cost > 0:
new_cost = round(fallback_cost, 2)
elif archive.filament_used_grams and archive.filament_type:
primary_type = archive.filament_type.split(",")[0].strip()
cost_per_kg = filaments.get(primary_type, default_cost_per_kg)
new_cost = round((archive.filament_used_grams / 1000) * cost_per_kg, 2)
else:
new_cost = None
if new_cost is not None and archive.cost != new_cost:
archive.cost = new_cost
updated += 1
await db.commit()
return {"message": f"Recalculated costs for {updated} archives", "updated": updated}
@router.post("/rescan-all")
async def rescan_all_archives(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Rescan all archives and update their metadata."""
from backend.app.services.archive import ThreeMFParser
result = await db.execute(select(PrintArchive))
archives = list(result.scalars().all())
updated = 0
errors = []
for archive in archives:
try:
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
errors.append({"id": archive.id, "error": "File not found"})
continue
parser = ThreeMFParser(file_path)
metadata = parser.parse()
if metadata.get("filament_type"):
archive.filament_type = metadata["filament_type"]
if metadata.get("filament_color"):
archive.filament_color = metadata["filament_color"]
if metadata.get("print_time_seconds"):
archive.print_time_seconds = metadata["print_time_seconds"]
if metadata.get("filament_used_grams"):
archive.filament_used_grams = metadata["filament_used_grams"]
if metadata.get("layer_height"):
archive.layer_height = metadata["layer_height"]
if metadata.get("nozzle_diameter"):
archive.nozzle_diameter = metadata["nozzle_diameter"]
if metadata.get("makerworld_url"):
archive.makerworld_url = metadata["makerworld_url"]
if metadata.get("designer"):
archive.designer = metadata["designer"]
updated += 1
except Exception as e:
logger.exception("Failed to rescan archive %s: %s", archive.id, e)
errors.append({"id": archive.id, "error": "Failed to parse 3MF file"})
await db.commit()
return {"updated": updated, "errors": errors}
@router.get("/{archive_id}/duplicates")
async def get_archive_duplicates(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Get duplicates for a specific archive."""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
makerworld_id = archive.extra_data.get("makerworld_model_id") if archive.extra_data else None
duplicates = await service.find_duplicates(
archive_id=archive.id,
content_hash=archive.content_hash,
print_name=archive.print_name,
makerworld_model_id=makerworld_id,
)
return {"duplicates": duplicates, "count": len(duplicates)}
@router.post("/backfill-hashes")
async def backfill_content_hashes(
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_UPDATE_ALL),
):
"""Compute and store content hashes for all archives missing them."""
result = await db.execute(select(PrintArchive).where(PrintArchive.content_hash.is_(None)))
archives = list(result.scalars().all())
updated = 0
errors = []
for archive in archives:
try:
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
errors.append({"id": archive.id, "error": "File not found"})
continue
archive.content_hash = ArchiveService.compute_file_hash(file_path)
updated += 1
except Exception as e:
logger.exception("Failed to compute hash for archive %s: %s", archive.id, e)
errors.append({"id": archive.id, "error": "Failed to compute hash"})
await db.commit()
return {"updated": updated, "errors": errors}
@router.delete("/{archive_id}")
async def delete_archive(
archive_id: int,
db: AsyncSession = Depends(get_db),
auth_result: tuple[User | None, bool] = Depends(
require_ownership_permission(
Permission.ARCHIVES_DELETE_ALL,
Permission.ARCHIVES_DELETE_OWN,
)
),
):
"""Delete an archive."""
user, can_modify_all = auth_result
# Get archive first to check ownership
result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
archive = result.scalar_one_or_none()
if not archive:
raise HTTPException(404, "Archive not found")
# Ownership check
if not can_modify_all:
if archive.created_by_id != user.id:
raise HTTPException(403, "You can only delete your own archives")
service = ArchiveService(db)
if not await service.delete_archive(archive_id):
raise HTTPException(404, "Archive not found")
return {"status": "deleted"}
@router.get("/{archive_id}/download")
async def download_archive(
archive_id: int,
inline: bool = False,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Download the 3MF file."""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
raise HTTPException(404, "File not found")
# Use inline disposition to let browser/OS handle file association
content_disposition = "inline" if inline else "attachment"
return FileResponse(
path=file_path,
filename=archive.filename,
media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml",
content_disposition_type=content_disposition,
)
@router.get("/{archive_id}/file/{filename}")
async def download_archive_with_filename(
archive_id: int,
filename: str,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Download the 3MF file with filename in URL."""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
raise HTTPException(404, "File not found")
return FileResponse(
path=file_path,
filename=archive.filename,
media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml",
)
@router.post("/{archive_id}/slicer-token")
async def create_archive_slicer_token(
archive_id: int,
db: AsyncSession = Depends(get_db),
_: User | None = RequirePermissionIfAuthEnabled(Permission.ARCHIVES_READ),
):
"""Create a short-lived download token for opening files in slicer applications.
Slicer protocol handlers (bambustudioopen://, orcaslicer://) cannot send
auth headers, so they use this token in the URL path instead.
"""
from backend.app.core.auth import create_slicer_download_token
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
token = create_slicer_download_token("archive", archive_id)
return {"token": token}
@router.get("/{archive_id}/dl/{token}/{filename}")
async def download_archive_for_slicer(
archive_id: int,
token: str,
filename: str,
db: AsyncSession = Depends(get_db),
):
"""Download 3MF file using a slicer download token.
Token-authenticated (no auth headers needed). The token is short-lived
and single-use, created by POST /{archive_id}/slicer-token.
Filename is at the end of the URL so slicers can detect the file format.
"""
from backend.app.core.auth import verify_slicer_download_token
if not verify_slicer_download_token(token, "archive", archive_id):
raise HTTPException(403, "Invalid or expired download token")
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive:
raise HTTPException(404, "Archive not found")
file_path = settings.base_dir / archive.file_path
if not file_path.is_file():
raise HTTPException(404, "File not found")
return FileResponse(
path=file_path,
filename=archive.filename,
media_type="application/vnd.ms-package.3dmanufacturing-3dmodel+xml",
)
@router.get("/{archive_id}/thumbnail")
async def get_thumbnail(
archive_id: int,
db: AsyncSession = Depends(get_db),
):
"""Get the thumbnail image.
Note: Unauthenticated - loaded via
tags which can't send auth headers.
"""
service = ArchiveService(db)
archive = await service.get_archive(archive_id)
if not archive or not archive.thumbnail_path:
raise HTTPException(404, "Thumbnail not found")
thumb_path = settings.base_dir / archive.thumbnail_path
if not thumb_path.exists():
raise HTTPException(404, "Thumbnail file not found")
# Use file modification time as ETag to bust cache
mtime = int(thumb_path.stat().st_mtime)
return FileResponse(
path=thumb_path,
media_type="image/png",
headers={
"Cache-Control": "no-cache, must-revalidate",
"ETag": f'"{mtime}"',
},
)
@router.get("/{archive_id}/timelapse")
async def get_timelapse(
archive_id: int,
db: AsyncSession = Depends(get_db),
):
"""Get the timelapse video.
Note: Unauthenticated - loaded via