export.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. import csv
  2. import io
  3. from datetime import datetime
  4. from typing import Any
  5. from sqlalchemy import select
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from sqlalchemy.orm import selectinload
  8. from backend.app.models.archive import PrintArchive
  9. class ExportService:
  10. """Service for exporting archive data to CSV/Excel formats."""
  11. # Default fields to export
  12. DEFAULT_FIELDS = [
  13. "id",
  14. "print_name",
  15. "filename",
  16. "status",
  17. "quantity",
  18. "printer_id",
  19. "project_name",
  20. "filament_type",
  21. "filament_used_grams",
  22. "print_time_seconds",
  23. "layer_height",
  24. "nozzle_diameter",
  25. "bed_temperature",
  26. "nozzle_temperature",
  27. "total_layers",
  28. "cost",
  29. "designer",
  30. "tags",
  31. "notes",
  32. "failure_reason",
  33. "started_at",
  34. "completed_at",
  35. "created_at",
  36. ]
  37. # Field labels for headers
  38. FIELD_LABELS = {
  39. "id": "ID",
  40. "print_name": "Print Name",
  41. "filename": "Filename",
  42. "status": "Status",
  43. "quantity": "Items Printed",
  44. "printer_id": "Printer ID",
  45. "project_name": "Project",
  46. "filament_type": "Filament Type",
  47. "filament_used_grams": "Filament (g)",
  48. "print_time_seconds": "Print Time (s)",
  49. "layer_height": "Layer Height (mm)",
  50. "nozzle_diameter": "Nozzle (mm)",
  51. "bed_temperature": "Bed Temp (°C)",
  52. "nozzle_temperature": "Nozzle Temp (°C)",
  53. "total_layers": "Total Layers",
  54. "cost": "Cost",
  55. "designer": "Designer",
  56. "tags": "Tags",
  57. "notes": "Notes",
  58. "failure_reason": "Failure Reason",
  59. "started_at": "Started At",
  60. "completed_at": "Completed At",
  61. "created_at": "Created At",
  62. }
  63. def __init__(self, db: AsyncSession):
  64. self.db = db
  65. async def export_archives(
  66. self,
  67. format: str = "csv",
  68. fields: list[str] | None = None,
  69. printer_id: int | None = None,
  70. project_id: int | None = None,
  71. status: str | None = None,
  72. date_from: datetime | None = None,
  73. date_to: datetime | None = None,
  74. search: str | None = None,
  75. ) -> tuple[bytes, str, str]:
  76. """Export archives to CSV or Excel format.
  77. Args:
  78. format: Export format ('csv' or 'xlsx')
  79. fields: List of fields to include (None = all default fields)
  80. printer_id: Filter by printer
  81. project_id: Filter by project
  82. status: Filter by status
  83. date_from: Filter by start date
  84. date_to: Filter by end date
  85. search: Search filter
  86. Returns:
  87. Tuple of (file_bytes, filename, content_type)
  88. """
  89. # Build query
  90. query = (
  91. select(PrintArchive).options(selectinload(PrintArchive.project)).order_by(PrintArchive.created_at.desc())
  92. )
  93. # Apply filters
  94. if printer_id:
  95. query = query.where(PrintArchive.printer_id == printer_id)
  96. if project_id:
  97. query = query.where(PrintArchive.project_id == project_id)
  98. if status:
  99. query = query.where(PrintArchive.status == status)
  100. if date_from:
  101. query = query.where(PrintArchive.created_at >= date_from)
  102. if date_to:
  103. query = query.where(PrintArchive.created_at <= date_to)
  104. if search:
  105. like_pattern = f"%{search}%"
  106. query = query.where(
  107. (PrintArchive.print_name.ilike(like_pattern))
  108. | (PrintArchive.filename.ilike(like_pattern))
  109. | (PrintArchive.tags.ilike(like_pattern))
  110. | (PrintArchive.notes.ilike(like_pattern))
  111. | (PrintArchive.designer.ilike(like_pattern))
  112. )
  113. # Execute query
  114. result = await self.db.execute(query)
  115. archives = list(result.scalars().all())
  116. # Determine fields to export
  117. export_fields = fields if fields else self.DEFAULT_FIELDS
  118. # Convert to rows
  119. rows = []
  120. for archive in archives:
  121. row = self._archive_to_row(archive, export_fields)
  122. rows.append(row)
  123. # Generate headers
  124. headers = [self.FIELD_LABELS.get(f, f) for f in export_fields]
  125. # Generate file
  126. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  127. if format == "xlsx":
  128. file_bytes = self._generate_xlsx(headers, rows, export_fields)
  129. filename = f"archives_export_{timestamp}.xlsx"
  130. content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  131. else:
  132. file_bytes = self._generate_csv(headers, rows)
  133. filename = f"archives_export_{timestamp}.csv"
  134. content_type = "text/csv"
  135. return file_bytes, filename, content_type
  136. async def export_stats(
  137. self,
  138. format: str = "csv",
  139. days: int = 30,
  140. printer_id: int | None = None,
  141. project_id: int | None = None,
  142. created_by_id: int | None = None,
  143. ) -> tuple[bytes, str, str]:
  144. """Export statistics summary to CSV or Excel format.
  145. Args:
  146. format: Export format ('csv' or 'xlsx')
  147. days: Number of days to include in stats
  148. printer_id: Filter by printer
  149. project_id: Filter by project
  150. created_by_id: Filter by user who created the print (-1 for no user)
  151. Returns:
  152. Tuple of (file_bytes, filename, content_type)
  153. """
  154. from backend.app.services.failure_analysis import FailureAnalysisService
  155. # Get failure analysis data (includes stats)
  156. analysis_service = FailureAnalysisService(self.db)
  157. analysis = await analysis_service.analyze_failures(
  158. days=days,
  159. printer_id=printer_id,
  160. project_id=project_id,
  161. created_by_id=created_by_id,
  162. )
  163. # Build stats rows
  164. rows = [
  165. ["Metric", "Value"],
  166. ["Period (days)", analysis["period_days"]],
  167. ["Total Prints", analysis["total_prints"]],
  168. ["Failed Prints", analysis["failed_prints"]],
  169. ["Failure Rate (%)", analysis["failure_rate"]],
  170. [""],
  171. ["Failures by Reason", ""],
  172. ]
  173. for reason, count in analysis["failures_by_reason"].items():
  174. rows.append([reason, count])
  175. rows.append([""])
  176. rows.append(["Failures by Filament", ""])
  177. for filament, count in analysis["failures_by_filament"].items():
  178. rows.append([filament, count])
  179. rows.append([""])
  180. rows.append(["Failures by Printer", ""])
  181. for printer, count in analysis["failures_by_printer"].items():
  182. rows.append([printer, count])
  183. rows.append([""])
  184. rows.append(["Weekly Trend", ""])
  185. rows.append(["Week", "Total", "Failed", "Rate (%)"])
  186. for week in analysis["trend"]:
  187. rows.append(
  188. [
  189. week["week_start"],
  190. week["total_prints"],
  191. week["failed_prints"],
  192. week["failure_rate"],
  193. ]
  194. )
  195. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  196. if format == "xlsx":
  197. file_bytes = self._generate_xlsx_simple(rows)
  198. filename = f"stats_export_{timestamp}.xlsx"
  199. content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  200. else:
  201. file_bytes = self._generate_csv_simple(rows)
  202. filename = f"stats_export_{timestamp}.csv"
  203. content_type = "text/csv"
  204. return file_bytes, filename, content_type
  205. def _archive_to_row(self, archive: PrintArchive, fields: list[str]) -> list[Any]:
  206. """Convert an archive to a row of values."""
  207. row = []
  208. for field in fields:
  209. if field == "project_name":
  210. value = archive.project.name if archive.project else None
  211. elif field in ("started_at", "completed_at", "created_at"):
  212. value = getattr(archive, field)
  213. if value:
  214. value = value.isoformat()
  215. else:
  216. value = getattr(archive, field, None)
  217. row.append(value)
  218. return row
  219. def _generate_csv(self, headers: list[str], rows: list[list]) -> bytes:
  220. """Generate CSV file content."""
  221. output = io.StringIO()
  222. writer = csv.writer(output)
  223. writer.writerow(headers)
  224. writer.writerows(rows)
  225. return output.getvalue().encode("utf-8")
  226. def _generate_csv_simple(self, rows: list[list]) -> bytes:
  227. """Generate CSV file content from simple rows (no separate headers)."""
  228. output = io.StringIO()
  229. writer = csv.writer(output)
  230. writer.writerows(rows)
  231. return output.getvalue().encode("utf-8")
  232. def _generate_xlsx(self, headers: list[str], rows: list[list], fields: list[str]) -> bytes:
  233. """Generate Excel file content."""
  234. try:
  235. from openpyxl import Workbook
  236. from openpyxl.styles import Alignment, Font, PatternFill
  237. from openpyxl.utils import get_column_letter
  238. except ImportError:
  239. raise ImportError("openpyxl is required for Excel export. Install with: pip install openpyxl")
  240. wb = Workbook()
  241. ws = wb.active
  242. ws.title = "Archives"
  243. # Header style
  244. header_font = Font(bold=True, color="FFFFFF")
  245. header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
  246. header_alignment = Alignment(horizontal="center")
  247. # Write headers
  248. for col, header in enumerate(headers, 1):
  249. cell = ws.cell(row=1, column=col, value=header)
  250. cell.font = header_font
  251. cell.fill = header_fill
  252. cell.alignment = header_alignment
  253. # Write data
  254. for row_idx, row in enumerate(rows, 2):
  255. for col_idx, value in enumerate(row, 1):
  256. ws.cell(row=row_idx, column=col_idx, value=value)
  257. # Auto-adjust column widths
  258. for col_idx, _field in enumerate(fields, 1):
  259. column_letter = get_column_letter(col_idx)
  260. max_length = len(headers[col_idx - 1])
  261. for row in rows:
  262. cell_value = row[col_idx - 1]
  263. if cell_value is not None:
  264. max_length = max(max_length, len(str(cell_value)))
  265. ws.column_dimensions[column_letter].width = min(max_length + 2, 50)
  266. # Freeze header row
  267. ws.freeze_panes = "A2"
  268. output = io.BytesIO()
  269. wb.save(output)
  270. return output.getvalue()
  271. def _generate_xlsx_simple(self, rows: list[list]) -> bytes:
  272. """Generate Excel file content from simple rows."""
  273. try:
  274. from openpyxl import Workbook
  275. from openpyxl.styles import Font
  276. except ImportError:
  277. raise ImportError("openpyxl is required for Excel export. Install with: pip install openpyxl")
  278. wb = Workbook()
  279. ws = wb.active
  280. ws.title = "Statistics"
  281. bold_font = Font(bold=True)
  282. for row_idx, row in enumerate(rows, 1):
  283. for col_idx, value in enumerate(row, 1):
  284. cell = ws.cell(row=row_idx, column=col_idx, value=value)
  285. # Bold section headers
  286. if col_idx == 1 and value and isinstance(value, str) and value.endswith(":"):
  287. cell.font = bold_font
  288. output = io.BytesIO()
  289. wb.save(output)
  290. return output.getvalue()