export.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import csv
  2. import io
  3. from datetime import datetime
  4. from typing import Any
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from sqlalchemy import select
  7. from sqlalchemy.orm import selectinload
  8. from backend.app.models.archive import PrintArchive
  9. from backend.app.models.project import Project
  10. class ExportService:
  11. """Service for exporting archive data to CSV/Excel formats."""
  12. # Default fields to export
  13. DEFAULT_FIELDS = [
  14. "id",
  15. "print_name",
  16. "filename",
  17. "status",
  18. "printer_id",
  19. "project_name",
  20. "filament_type",
  21. "filament_used_grams",
  22. "print_time_seconds",
  23. "layer_height",
  24. "nozzle_diameter",
  25. "bed_temperature",
  26. "nozzle_temperature",
  27. "total_layers",
  28. "cost",
  29. "designer",
  30. "tags",
  31. "notes",
  32. "failure_reason",
  33. "started_at",
  34. "completed_at",
  35. "created_at",
  36. ]
  37. # Field labels for headers
  38. FIELD_LABELS = {
  39. "id": "ID",
  40. "print_name": "Print Name",
  41. "filename": "Filename",
  42. "status": "Status",
  43. "printer_id": "Printer ID",
  44. "project_name": "Project",
  45. "filament_type": "Filament Type",
  46. "filament_used_grams": "Filament (g)",
  47. "print_time_seconds": "Print Time (s)",
  48. "layer_height": "Layer Height (mm)",
  49. "nozzle_diameter": "Nozzle (mm)",
  50. "bed_temperature": "Bed Temp (°C)",
  51. "nozzle_temperature": "Nozzle Temp (°C)",
  52. "total_layers": "Total Layers",
  53. "cost": "Cost",
  54. "designer": "Designer",
  55. "tags": "Tags",
  56. "notes": "Notes",
  57. "failure_reason": "Failure Reason",
  58. "started_at": "Started At",
  59. "completed_at": "Completed At",
  60. "created_at": "Created At",
  61. }
  62. def __init__(self, db: AsyncSession):
  63. self.db = db
  64. async def export_archives(
  65. self,
  66. format: str = "csv",
  67. fields: list[str] | None = None,
  68. printer_id: int | None = None,
  69. project_id: int | None = None,
  70. status: str | None = None,
  71. date_from: datetime | None = None,
  72. date_to: datetime | None = None,
  73. search: str | None = None,
  74. ) -> tuple[bytes, str, str]:
  75. """Export archives to CSV or Excel format.
  76. Args:
  77. format: Export format ('csv' or 'xlsx')
  78. fields: List of fields to include (None = all default fields)
  79. printer_id: Filter by printer
  80. project_id: Filter by project
  81. status: Filter by status
  82. date_from: Filter by start date
  83. date_to: Filter by end date
  84. search: Search filter
  85. Returns:
  86. Tuple of (file_bytes, filename, content_type)
  87. """
  88. # Build query
  89. query = (
  90. select(PrintArchive)
  91. .options(selectinload(PrintArchive.project))
  92. .order_by(PrintArchive.created_at.desc())
  93. )
  94. # Apply filters
  95. if printer_id:
  96. query = query.where(PrintArchive.printer_id == printer_id)
  97. if project_id:
  98. query = query.where(PrintArchive.project_id == project_id)
  99. if status:
  100. query = query.where(PrintArchive.status == status)
  101. if date_from:
  102. query = query.where(PrintArchive.created_at >= date_from)
  103. if date_to:
  104. query = query.where(PrintArchive.created_at <= date_to)
  105. if search:
  106. like_pattern = f"%{search}%"
  107. query = query.where(
  108. (PrintArchive.print_name.ilike(like_pattern)) |
  109. (PrintArchive.filename.ilike(like_pattern)) |
  110. (PrintArchive.tags.ilike(like_pattern)) |
  111. (PrintArchive.notes.ilike(like_pattern)) |
  112. (PrintArchive.designer.ilike(like_pattern))
  113. )
  114. # Execute query
  115. result = await self.db.execute(query)
  116. archives = list(result.scalars().all())
  117. # Determine fields to export
  118. export_fields = fields if fields else self.DEFAULT_FIELDS
  119. # Convert to rows
  120. rows = []
  121. for archive in archives:
  122. row = self._archive_to_row(archive, export_fields)
  123. rows.append(row)
  124. # Generate headers
  125. headers = [self.FIELD_LABELS.get(f, f) for f in export_fields]
  126. # Generate file
  127. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  128. if format == "xlsx":
  129. file_bytes = self._generate_xlsx(headers, rows, export_fields)
  130. filename = f"archives_export_{timestamp}.xlsx"
  131. content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  132. else:
  133. file_bytes = self._generate_csv(headers, rows)
  134. filename = f"archives_export_{timestamp}.csv"
  135. content_type = "text/csv"
  136. return file_bytes, filename, content_type
  137. async def export_stats(
  138. self,
  139. format: str = "csv",
  140. days: int = 30,
  141. printer_id: int | None = None,
  142. project_id: int | None = None,
  143. ) -> tuple[bytes, str, str]:
  144. """Export statistics summary to CSV or Excel format.
  145. Args:
  146. format: Export format ('csv' or 'xlsx')
  147. days: Number of days to include in stats
  148. printer_id: Filter by printer
  149. project_id: Filter by project
  150. Returns:
  151. Tuple of (file_bytes, filename, content_type)
  152. """
  153. from backend.app.services.failure_analysis import FailureAnalysisService
  154. # Get failure analysis data (includes stats)
  155. analysis_service = FailureAnalysisService(self.db)
  156. analysis = await analysis_service.analyze_failures(
  157. days=days,
  158. printer_id=printer_id,
  159. project_id=project_id,
  160. )
  161. # Build stats rows
  162. rows = [
  163. ["Metric", "Value"],
  164. ["Period (days)", analysis["period_days"]],
  165. ["Total Prints", analysis["total_prints"]],
  166. ["Failed Prints", analysis["failed_prints"]],
  167. ["Failure Rate (%)", analysis["failure_rate"]],
  168. [""],
  169. ["Failures by Reason", ""],
  170. ]
  171. for reason, count in analysis["failures_by_reason"].items():
  172. rows.append([reason, count])
  173. rows.append([""])
  174. rows.append(["Failures by Filament", ""])
  175. for filament, count in analysis["failures_by_filament"].items():
  176. rows.append([filament, count])
  177. rows.append([""])
  178. rows.append(["Failures by Printer", ""])
  179. for printer, count in analysis["failures_by_printer"].items():
  180. rows.append([printer, count])
  181. rows.append([""])
  182. rows.append(["Weekly Trend", ""])
  183. rows.append(["Week", "Total", "Failed", "Rate (%)"])
  184. for week in analysis["trend"]:
  185. rows.append([
  186. week["week_start"],
  187. week["total_prints"],
  188. week["failed_prints"],
  189. week["failure_rate"],
  190. ])
  191. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  192. if format == "xlsx":
  193. file_bytes = self._generate_xlsx_simple(rows)
  194. filename = f"stats_export_{timestamp}.xlsx"
  195. content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  196. else:
  197. file_bytes = self._generate_csv_simple(rows)
  198. filename = f"stats_export_{timestamp}.csv"
  199. content_type = "text/csv"
  200. return file_bytes, filename, content_type
  201. def _archive_to_row(self, archive: PrintArchive, fields: list[str]) -> list[Any]:
  202. """Convert an archive to a row of values."""
  203. row = []
  204. for field in fields:
  205. if field == "project_name":
  206. value = archive.project.name if archive.project else None
  207. elif field in ("started_at", "completed_at", "created_at"):
  208. value = getattr(archive, field)
  209. if value:
  210. value = value.isoformat()
  211. else:
  212. value = getattr(archive, field, None)
  213. row.append(value)
  214. return row
  215. def _generate_csv(self, headers: list[str], rows: list[list]) -> bytes:
  216. """Generate CSV file content."""
  217. output = io.StringIO()
  218. writer = csv.writer(output)
  219. writer.writerow(headers)
  220. writer.writerows(rows)
  221. return output.getvalue().encode("utf-8")
  222. def _generate_csv_simple(self, rows: list[list]) -> bytes:
  223. """Generate CSV file content from simple rows (no separate headers)."""
  224. output = io.StringIO()
  225. writer = csv.writer(output)
  226. writer.writerows(rows)
  227. return output.getvalue().encode("utf-8")
  228. def _generate_xlsx(self, headers: list[str], rows: list[list], fields: list[str]) -> bytes:
  229. """Generate Excel file content."""
  230. try:
  231. from openpyxl import Workbook
  232. from openpyxl.styles import Font, PatternFill, Alignment
  233. from openpyxl.utils import get_column_letter
  234. except ImportError:
  235. raise ImportError("openpyxl is required for Excel export. Install with: pip install openpyxl")
  236. wb = Workbook()
  237. ws = wb.active
  238. ws.title = "Archives"
  239. # Header style
  240. header_font = Font(bold=True, color="FFFFFF")
  241. header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
  242. header_alignment = Alignment(horizontal="center")
  243. # Write headers
  244. for col, header in enumerate(headers, 1):
  245. cell = ws.cell(row=1, column=col, value=header)
  246. cell.font = header_font
  247. cell.fill = header_fill
  248. cell.alignment = header_alignment
  249. # Write data
  250. for row_idx, row in enumerate(rows, 2):
  251. for col_idx, value in enumerate(row, 1):
  252. ws.cell(row=row_idx, column=col_idx, value=value)
  253. # Auto-adjust column widths
  254. for col_idx, field in enumerate(fields, 1):
  255. column_letter = get_column_letter(col_idx)
  256. max_length = len(headers[col_idx - 1])
  257. for row in rows:
  258. cell_value = row[col_idx - 1]
  259. if cell_value is not None:
  260. max_length = max(max_length, len(str(cell_value)))
  261. ws.column_dimensions[column_letter].width = min(max_length + 2, 50)
  262. # Freeze header row
  263. ws.freeze_panes = "A2"
  264. output = io.BytesIO()
  265. wb.save(output)
  266. return output.getvalue()
  267. def _generate_xlsx_simple(self, rows: list[list]) -> bytes:
  268. """Generate Excel file content from simple rows."""
  269. try:
  270. from openpyxl import Workbook
  271. from openpyxl.styles import Font
  272. except ImportError:
  273. raise ImportError("openpyxl is required for Excel export. Install with: pip install openpyxl")
  274. wb = Workbook()
  275. ws = wb.active
  276. ws.title = "Statistics"
  277. bold_font = Font(bold=True)
  278. for row_idx, row in enumerate(rows, 1):
  279. for col_idx, value in enumerate(row, 1):
  280. cell = ws.cell(row=row_idx, column=col_idx, value=value)
  281. # Bold section headers
  282. if col_idx == 1 and value and isinstance(value, str) and value.endswith(":"):
  283. cell.font = bold_font
  284. output = io.BytesIO()
  285. wb.save(output)
  286. return output.getvalue()