| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- import csv
- import io
- from datetime import datetime
- from typing import Any
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from sqlalchemy.orm import selectinload
- from backend.app.models.archive import PrintArchive
- from backend.app.models.project import Project
- class ExportService:
- """Service for exporting archive data to CSV/Excel formats."""
- # Default fields to export
- DEFAULT_FIELDS = [
- "id",
- "print_name",
- "filename",
- "status",
- "printer_id",
- "project_name",
- "filament_type",
- "filament_used_grams",
- "print_time_seconds",
- "layer_height",
- "nozzle_diameter",
- "bed_temperature",
- "nozzle_temperature",
- "total_layers",
- "cost",
- "designer",
- "tags",
- "notes",
- "failure_reason",
- "started_at",
- "completed_at",
- "created_at",
- ]
- # Field labels for headers
- FIELD_LABELS = {
- "id": "ID",
- "print_name": "Print Name",
- "filename": "Filename",
- "status": "Status",
- "printer_id": "Printer ID",
- "project_name": "Project",
- "filament_type": "Filament Type",
- "filament_used_grams": "Filament (g)",
- "print_time_seconds": "Print Time (s)",
- "layer_height": "Layer Height (mm)",
- "nozzle_diameter": "Nozzle (mm)",
- "bed_temperature": "Bed Temp (°C)",
- "nozzle_temperature": "Nozzle Temp (°C)",
- "total_layers": "Total Layers",
- "cost": "Cost",
- "designer": "Designer",
- "tags": "Tags",
- "notes": "Notes",
- "failure_reason": "Failure Reason",
- "started_at": "Started At",
- "completed_at": "Completed At",
- "created_at": "Created At",
- }
- def __init__(self, db: AsyncSession):
- self.db = db
- async def export_archives(
- self,
- format: str = "csv",
- fields: list[str] | None = None,
- printer_id: int | None = None,
- project_id: int | None = None,
- status: str | None = None,
- date_from: datetime | None = None,
- date_to: datetime | None = None,
- search: str | None = None,
- ) -> tuple[bytes, str, str]:
- """Export archives to CSV or Excel format.
- Args:
- format: Export format ('csv' or 'xlsx')
- fields: List of fields to include (None = all default fields)
- printer_id: Filter by printer
- project_id: Filter by project
- status: Filter by status
- date_from: Filter by start date
- date_to: Filter by end date
- search: Search filter
- Returns:
- Tuple of (file_bytes, filename, content_type)
- """
- # Build query
- query = (
- select(PrintArchive)
- .options(selectinload(PrintArchive.project))
- .order_by(PrintArchive.created_at.desc())
- )
- # Apply filters
- if printer_id:
- query = query.where(PrintArchive.printer_id == printer_id)
- if project_id:
- query = query.where(PrintArchive.project_id == project_id)
- if status:
- query = query.where(PrintArchive.status == status)
- if date_from:
- query = query.where(PrintArchive.created_at >= date_from)
- if date_to:
- query = query.where(PrintArchive.created_at <= date_to)
- if search:
- like_pattern = f"%{search}%"
- query = query.where(
- (PrintArchive.print_name.ilike(like_pattern)) |
- (PrintArchive.filename.ilike(like_pattern)) |
- (PrintArchive.tags.ilike(like_pattern)) |
- (PrintArchive.notes.ilike(like_pattern)) |
- (PrintArchive.designer.ilike(like_pattern))
- )
- # Execute query
- result = await self.db.execute(query)
- archives = list(result.scalars().all())
- # Determine fields to export
- export_fields = fields if fields else self.DEFAULT_FIELDS
- # Convert to rows
- rows = []
- for archive in archives:
- row = self._archive_to_row(archive, export_fields)
- rows.append(row)
- # Generate headers
- headers = [self.FIELD_LABELS.get(f, f) for f in export_fields]
- # Generate file
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- if format == "xlsx":
- file_bytes = self._generate_xlsx(headers, rows, export_fields)
- filename = f"archives_export_{timestamp}.xlsx"
- content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
- else:
- file_bytes = self._generate_csv(headers, rows)
- filename = f"archives_export_{timestamp}.csv"
- content_type = "text/csv"
- return file_bytes, filename, content_type
- async def export_stats(
- self,
- format: str = "csv",
- days: int = 30,
- printer_id: int | None = None,
- project_id: int | None = None,
- ) -> tuple[bytes, str, str]:
- """Export statistics summary to CSV or Excel format.
- Args:
- format: Export format ('csv' or 'xlsx')
- days: Number of days to include in stats
- printer_id: Filter by printer
- project_id: Filter by project
- Returns:
- Tuple of (file_bytes, filename, content_type)
- """
- from backend.app.services.failure_analysis import FailureAnalysisService
- # Get failure analysis data (includes stats)
- analysis_service = FailureAnalysisService(self.db)
- analysis = await analysis_service.analyze_failures(
- days=days,
- printer_id=printer_id,
- project_id=project_id,
- )
- # Build stats rows
- rows = [
- ["Metric", "Value"],
- ["Period (days)", analysis["period_days"]],
- ["Total Prints", analysis["total_prints"]],
- ["Failed Prints", analysis["failed_prints"]],
- ["Failure Rate (%)", analysis["failure_rate"]],
- [""],
- ["Failures by Reason", ""],
- ]
- for reason, count in analysis["failures_by_reason"].items():
- rows.append([reason, count])
- rows.append([""])
- rows.append(["Failures by Filament", ""])
- for filament, count in analysis["failures_by_filament"].items():
- rows.append([filament, count])
- rows.append([""])
- rows.append(["Failures by Printer", ""])
- for printer, count in analysis["failures_by_printer"].items():
- rows.append([printer, count])
- rows.append([""])
- rows.append(["Weekly Trend", ""])
- rows.append(["Week", "Total", "Failed", "Rate (%)"])
- for week in analysis["trend"]:
- rows.append([
- week["week_start"],
- week["total_prints"],
- week["failed_prints"],
- week["failure_rate"],
- ])
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- if format == "xlsx":
- file_bytes = self._generate_xlsx_simple(rows)
- filename = f"stats_export_{timestamp}.xlsx"
- content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
- else:
- file_bytes = self._generate_csv_simple(rows)
- filename = f"stats_export_{timestamp}.csv"
- content_type = "text/csv"
- return file_bytes, filename, content_type
- def _archive_to_row(self, archive: PrintArchive, fields: list[str]) -> list[Any]:
- """Convert an archive to a row of values."""
- row = []
- for field in fields:
- if field == "project_name":
- value = archive.project.name if archive.project else None
- elif field in ("started_at", "completed_at", "created_at"):
- value = getattr(archive, field)
- if value:
- value = value.isoformat()
- else:
- value = getattr(archive, field, None)
- row.append(value)
- return row
- def _generate_csv(self, headers: list[str], rows: list[list]) -> bytes:
- """Generate CSV file content."""
- output = io.StringIO()
- writer = csv.writer(output)
- writer.writerow(headers)
- writer.writerows(rows)
- return output.getvalue().encode("utf-8")
- def _generate_csv_simple(self, rows: list[list]) -> bytes:
- """Generate CSV file content from simple rows (no separate headers)."""
- output = io.StringIO()
- writer = csv.writer(output)
- writer.writerows(rows)
- return output.getvalue().encode("utf-8")
- def _generate_xlsx(self, headers: list[str], rows: list[list], fields: list[str]) -> bytes:
- """Generate Excel file content."""
- try:
- from openpyxl import Workbook
- from openpyxl.styles import Font, PatternFill, Alignment
- from openpyxl.utils import get_column_letter
- except ImportError:
- raise ImportError("openpyxl is required for Excel export. Install with: pip install openpyxl")
- wb = Workbook()
- ws = wb.active
- ws.title = "Archives"
- # Header style
- header_font = Font(bold=True, color="FFFFFF")
- header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
- header_alignment = Alignment(horizontal="center")
- # Write headers
- for col, header in enumerate(headers, 1):
- cell = ws.cell(row=1, column=col, value=header)
- cell.font = header_font
- cell.fill = header_fill
- cell.alignment = header_alignment
- # Write data
- for row_idx, row in enumerate(rows, 2):
- for col_idx, value in enumerate(row, 1):
- ws.cell(row=row_idx, column=col_idx, value=value)
- # Auto-adjust column widths
- for col_idx, field in enumerate(fields, 1):
- column_letter = get_column_letter(col_idx)
- max_length = len(headers[col_idx - 1])
- for row in rows:
- cell_value = row[col_idx - 1]
- if cell_value is not None:
- max_length = max(max_length, len(str(cell_value)))
- ws.column_dimensions[column_letter].width = min(max_length + 2, 50)
- # Freeze header row
- ws.freeze_panes = "A2"
- output = io.BytesIO()
- wb.save(output)
- return output.getvalue()
- def _generate_xlsx_simple(self, rows: list[list]) -> bytes:
- """Generate Excel file content from simple rows."""
- try:
- from openpyxl import Workbook
- from openpyxl.styles import Font
- except ImportError:
- raise ImportError("openpyxl is required for Excel export. Install with: pip install openpyxl")
- wb = Workbook()
- ws = wb.active
- ws.title = "Statistics"
- bold_font = Font(bold=True)
- for row_idx, row in enumerate(rows, 1):
- for col_idx, value in enumerate(row, 1):
- cell = ws.cell(row=row_idx, column=col_idx, value=value)
- # Bold section headers
- if col_idx == 1 and value and isinstance(value, str) and value.endswith(":"):
- cell.font = bold_font
- output = io.BytesIO()
- wb.save(output)
- return output.getvalue()
|