archive.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. import hashlib
  2. import json
  3. import zipfile
  4. import shutil
  5. from datetime import datetime
  6. from pathlib import Path
  7. from xml.etree import ElementTree as ET
  8. from sqlalchemy.ext.asyncio import AsyncSession
  9. from sqlalchemy import select, and_, or_
  10. from backend.app.core.config import settings
  11. from backend.app.models.archive import PrintArchive
  12. from backend.app.models.printer import Printer
  13. class ThreeMFParser:
  14. """Parser for Bambu Lab 3MF files."""
  15. def __init__(self, file_path: Path):
  16. self.file_path = file_path
  17. self.metadata: dict = {}
  18. def parse(self) -> dict:
  19. """Extract metadata from 3MF file."""
  20. try:
  21. with zipfile.ZipFile(self.file_path, "r") as zf:
  22. self._parse_slice_info(zf)
  23. self._parse_project_settings(zf)
  24. self._parse_3dmodel(zf)
  25. self._extract_thumbnail(zf)
  26. # Prefer slice_info for colors (shows ALL filaments actually used in print)
  27. # project_settings may filter out "support" filaments incorrectly
  28. if self.metadata.get("_slice_filament_color"):
  29. self.metadata["filament_color"] = self.metadata["_slice_filament_color"]
  30. if not self.metadata.get("filament_type") and self.metadata.get("_slice_filament_type"):
  31. self.metadata["filament_type"] = self.metadata["_slice_filament_type"]
  32. # Clean up internal keys
  33. self.metadata.pop("_slice_filament_type", None)
  34. self.metadata.pop("_slice_filament_color", None)
  35. except Exception:
  36. pass
  37. return self.metadata
  38. def _parse_slice_info(self, zf: zipfile.ZipFile):
  39. """Parse slice_info.config for print settings."""
  40. try:
  41. if "Metadata/slice_info.config" in zf.namelist():
  42. content = zf.read("Metadata/slice_info.config").decode()
  43. root = ET.fromstring(content)
  44. # Get first plate's metadata
  45. plate = root.find(".//plate")
  46. if plate is not None:
  47. # Get prediction and weight from metadata elements
  48. for meta in plate.findall("metadata"):
  49. key = meta.get("key")
  50. value = meta.get("value")
  51. if key == "prediction" and value:
  52. self.metadata["print_time_seconds"] = int(value)
  53. elif key == "weight" and value:
  54. self.metadata["filament_used_grams"] = float(value)
  55. # Get filament info from ALL filaments actually used in the print
  56. # slice_info has <filament id="1" type="PLA" color="#FFFFFF" used_g="100" />
  57. filaments = root.findall(".//filament")
  58. if filaments:
  59. # Collect all unique filament types and colors
  60. types = []
  61. colors = []
  62. for f in filaments:
  63. ftype = f.get("type")
  64. fcolor = f.get("color")
  65. if ftype and ftype not in types:
  66. types.append(ftype)
  67. if fcolor and fcolor not in colors:
  68. colors.append(fcolor)
  69. if types:
  70. self.metadata["_slice_filament_type"] = ", ".join(types)
  71. if colors:
  72. self.metadata["_slice_filament_color"] = ",".join(colors)
  73. except Exception:
  74. pass
  75. def _parse_project_settings(self, zf: zipfile.ZipFile):
  76. """Parse project settings for print configuration."""
  77. try:
  78. if "Metadata/project_settings.config" in zf.namelist():
  79. content = zf.read("Metadata/project_settings.config").decode()
  80. try:
  81. data = json.loads(content)
  82. self._extract_filament_info(data)
  83. self._extract_print_settings(data)
  84. except json.JSONDecodeError:
  85. pass
  86. except Exception:
  87. pass
  88. def _extract_filament_info(self, data: dict):
  89. """Extract filament info, preferring non-support filaments."""
  90. try:
  91. filament_types = data.get("filament_type", [])
  92. filament_colors = data.get("filament_colour", [])
  93. filament_is_support = data.get("filament_is_support", [])
  94. if not filament_types:
  95. return
  96. # Collect all non-support filaments
  97. non_support_types = []
  98. non_support_colors = []
  99. for i, ftype in enumerate(filament_types):
  100. is_support = filament_is_support[i] if i < len(filament_is_support) else '0'
  101. if is_support == '0':
  102. if ftype and ftype not in non_support_types:
  103. non_support_types.append(ftype)
  104. if i < len(filament_colors) and filament_colors[i]:
  105. color = filament_colors[i]
  106. if color not in non_support_colors:
  107. non_support_colors.append(color)
  108. # Fallback to first filament if all are support
  109. if not non_support_types and filament_types:
  110. non_support_types = [filament_types[0]]
  111. if not non_support_colors and filament_colors:
  112. non_support_colors = [filament_colors[0]]
  113. # Store filament type(s)
  114. if non_support_types:
  115. self.metadata["filament_type"] = ", ".join(non_support_types)
  116. # Store all colors as comma-separated (for multi-color display)
  117. if non_support_colors:
  118. self.metadata["filament_color"] = ",".join(non_support_colors)
  119. except Exception:
  120. pass
  121. def _extract_print_settings(self, data: dict):
  122. """Extract print settings from JSON config."""
  123. try:
  124. # Layer height - usually an array, get first value
  125. if "layer_height" in data:
  126. val = data["layer_height"]
  127. if isinstance(val, list) and val:
  128. self.metadata["layer_height"] = float(val[0])
  129. elif isinstance(val, (int, float, str)):
  130. self.metadata["layer_height"] = float(val)
  131. # Nozzle diameter
  132. if "nozzle_diameter" in data:
  133. val = data["nozzle_diameter"]
  134. if isinstance(val, list) and val:
  135. self.metadata["nozzle_diameter"] = float(val[0])
  136. elif isinstance(val, (int, float, str)):
  137. self.metadata["nozzle_diameter"] = float(val)
  138. # Bed temperature - first layer or regular
  139. for key in ["bed_temperature_initial_layer", "bed_temperature"]:
  140. if key in data:
  141. val = data[key]
  142. if isinstance(val, list) and val:
  143. self.metadata["bed_temperature"] = int(float(val[0]))
  144. elif isinstance(val, (int, float, str)):
  145. self.metadata["bed_temperature"] = int(float(val))
  146. break
  147. # Nozzle temperature
  148. for key in ["nozzle_temperature_initial_layer", "nozzle_temperature"]:
  149. if key in data:
  150. val = data[key]
  151. if isinstance(val, list) and val:
  152. self.metadata["nozzle_temperature"] = int(float(val[0]))
  153. elif isinstance(val, (int, float, str)):
  154. self.metadata["nozzle_temperature"] = int(float(val))
  155. break
  156. except Exception:
  157. pass
  158. def _extract_settings_from_content(self, content: str):
  159. """Extract print settings from config content."""
  160. settings_map = {
  161. "layer_height": ("layer_height", float),
  162. "nozzle_diameter": ("nozzle_diameter", float),
  163. "bed_temperature": ("bed_temperature", int),
  164. "nozzle_temperature": ("nozzle_temperature", int),
  165. }
  166. for key, (search_key, converter) in settings_map.items():
  167. if key not in self.metadata:
  168. try:
  169. # Try JSON format
  170. if f'"{search_key}"' in content:
  171. start = content.find(f'"{search_key}"')
  172. value_start = content.find(":", start) + 1
  173. value_end = content.find(",", value_start)
  174. if value_end == -1:
  175. value_end = content.find("}", value_start)
  176. value = content[value_start:value_end].strip().strip('"')
  177. self.metadata[key] = converter(value)
  178. except Exception:
  179. pass
  180. def _parse_3dmodel(self, zf: zipfile.ZipFile):
  181. """Parse 3D/3dmodel.model for MakerWorld metadata."""
  182. import re
  183. try:
  184. model_path = "3D/3dmodel.model"
  185. if model_path not in zf.namelist():
  186. return
  187. content = zf.read(model_path).decode("utf-8", errors="ignore")
  188. # Parse XML metadata elements
  189. # MakerWorld adds metadata like: <metadata name="Designer">username</metadata>
  190. metadata_pattern = r'<metadata\s+name="([^"]+)"[^>]*>([^<]*)</metadata>'
  191. matches = re.findall(metadata_pattern, content)
  192. makerworld_fields = {}
  193. for name, value in matches:
  194. makerworld_fields[name] = value.strip()
  195. # Check for direct MakerWorld URL in content
  196. url_pattern = r'https?://makerworld\.com/[^\s<>"\']+/models/(\d+)'
  197. url_match = re.search(url_pattern, content)
  198. if url_match:
  199. self.metadata["makerworld_url"] = url_match.group(0)
  200. self.metadata["makerworld_model_id"] = url_match.group(1)
  201. # Extract model ID from DSM reference in image URLs
  202. # Format: https://makerworld.bblmw.com/makerworld/model/DSM00000001275614/...
  203. # The numeric part (1275614) is the MakerWorld model ID
  204. if "makerworld_url" not in self.metadata:
  205. dsm_pattern = r'DSM0+(\d+)'
  206. dsm_match = re.search(dsm_pattern, content)
  207. if dsm_match:
  208. model_id = dsm_match.group(1)
  209. self.metadata["makerworld_url"] = f"https://makerworld.com/en/models/{model_id}"
  210. self.metadata["makerworld_model_id"] = model_id
  211. # Store designer info
  212. if "Designer" in makerworld_fields:
  213. self.metadata["designer"] = makerworld_fields["Designer"]
  214. if "Title" in makerworld_fields:
  215. self.metadata["print_name"] = makerworld_fields["Title"]
  216. except Exception:
  217. pass
  218. def _extract_thumbnail(self, zf: zipfile.ZipFile):
  219. """Extract thumbnail image from 3MF."""
  220. thumbnail_paths = [
  221. "Metadata/plate_1.png",
  222. "Metadata/thumbnail.png",
  223. "Metadata/model_thumbnail.png",
  224. ]
  225. for thumb_path in thumbnail_paths:
  226. if thumb_path in zf.namelist():
  227. self.metadata["_thumbnail_data"] = zf.read(thumb_path)
  228. self.metadata["_thumbnail_ext"] = ".png"
  229. break
  230. class ProjectPageParser:
  231. """Parser for extracting project page data from Bambu Lab 3MF files."""
  232. def __init__(self, file_path: Path):
  233. self.file_path = file_path
  234. def parse(self, archive_id: int) -> dict:
  235. """Extract project page metadata and images from 3MF file."""
  236. import html
  237. import re
  238. result = {
  239. "title": None,
  240. "description": None,
  241. "designer": None,
  242. "designer_user_id": None,
  243. "license": None,
  244. "copyright": None,
  245. "creation_date": None,
  246. "modification_date": None,
  247. "origin": None,
  248. "profile_title": None,
  249. "profile_description": None,
  250. "profile_cover": None,
  251. "profile_user_id": None,
  252. "profile_user_name": None,
  253. "design_model_id": None,
  254. "design_profile_id": None,
  255. "design_region": None,
  256. "model_pictures": [],
  257. "profile_pictures": [],
  258. "thumbnails": [],
  259. }
  260. try:
  261. with zipfile.ZipFile(self.file_path, "r") as zf:
  262. # Parse 3D/3dmodel.model for metadata
  263. model_path = "3D/3dmodel.model"
  264. if model_path in zf.namelist():
  265. content = zf.read(model_path).decode("utf-8", errors="ignore")
  266. # Extract metadata elements using regex
  267. # Format: <metadata name="Key">Value</metadata> or <metadata name="Key" />
  268. metadata_pattern = r'<metadata\s+name="([^"]+)"[^>]*>([^<]*)</metadata>'
  269. matches = re.findall(metadata_pattern, content)
  270. field_mapping = {
  271. "Title": "title",
  272. "Description": "description",
  273. "Designer": "designer",
  274. "DesignerUserId": "designer_user_id",
  275. "License": "license",
  276. "Copyright": "copyright",
  277. "CreationDate": "creation_date",
  278. "ModificationDate": "modification_date",
  279. "Origin": "origin",
  280. "ProfileTitle": "profile_title",
  281. "ProfileDescription": "profile_description",
  282. "ProfileCover": "profile_cover",
  283. "ProfileUserId": "profile_user_id",
  284. "ProfileUserName": "profile_user_name",
  285. "DesignModelId": "design_model_id",
  286. "DesignProfileId": "design_profile_id",
  287. "DesignRegion": "design_region",
  288. }
  289. for name, value in matches:
  290. if name in field_mapping:
  291. # Decode HTML entities multiple times (content is often triple-encoded)
  292. decoded = value.strip()
  293. prev = None
  294. while prev != decoded:
  295. prev = decoded
  296. decoded = html.unescape(decoded)
  297. # Normalize non-breaking spaces to regular spaces
  298. decoded = decoded.replace('\xa0', ' ')
  299. result[field_mapping[name]] = decoded if decoded else None
  300. # List images in Auxiliaries folder
  301. from urllib.parse import quote
  302. for name in zf.namelist():
  303. if name.startswith("Auxiliaries/Model Pictures/"):
  304. filename = name.split("/")[-1]
  305. if filename:
  306. result["model_pictures"].append({
  307. "name": filename,
  308. "path": name,
  309. "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}",
  310. })
  311. elif name.startswith("Auxiliaries/Profile Pictures/"):
  312. filename = name.split("/")[-1]
  313. if filename:
  314. result["profile_pictures"].append({
  315. "name": filename,
  316. "path": name,
  317. "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}",
  318. })
  319. elif name.startswith("Auxiliaries/.thumbnails/"):
  320. filename = name.split("/")[-1]
  321. if filename:
  322. result["thumbnails"].append({
  323. "name": filename,
  324. "path": name,
  325. "url": f"/api/v1/archives/{archive_id}/project-image/{quote(name, safe='')}",
  326. })
  327. except Exception as e:
  328. result["_error"] = str(e)
  329. return result
  330. def get_image(self, image_path: str) -> tuple[bytes, str] | None:
  331. """Extract an image from the 3MF file.
  332. Returns tuple of (image_data, content_type) or None if not found.
  333. """
  334. try:
  335. with zipfile.ZipFile(self.file_path, "r") as zf:
  336. if image_path in zf.namelist():
  337. data = zf.read(image_path)
  338. # Determine content type from extension
  339. ext = image_path.lower().split(".")[-1]
  340. content_types = {
  341. "png": "image/png",
  342. "jpg": "image/jpeg",
  343. "jpeg": "image/jpeg",
  344. "webp": "image/webp",
  345. "gif": "image/gif",
  346. }
  347. content_type = content_types.get(ext, "application/octet-stream")
  348. return (data, content_type)
  349. except Exception:
  350. pass
  351. return None
  352. def update_metadata(self, updates: dict) -> bool:
  353. """Update project page metadata in the 3MF file.
  354. Args:
  355. updates: Dict with fields to update (title, description, designer, etc.)
  356. Returns:
  357. True if successful, False otherwise.
  358. """
  359. import html
  360. import re
  361. import tempfile
  362. try:
  363. # Read the 3MF file
  364. with zipfile.ZipFile(self.file_path, "r") as zf_read:
  365. # Find and read the 3dmodel.model file
  366. model_path = "3D/3dmodel.model"
  367. if model_path not in zf_read.namelist():
  368. return False
  369. content = zf_read.read(model_path).decode("utf-8")
  370. # Update metadata fields
  371. field_mapping = {
  372. "title": "Title",
  373. "description": "Description",
  374. "designer": "Designer",
  375. "license": "License",
  376. "copyright": "Copyright",
  377. "profile_title": "ProfileTitle",
  378. "profile_description": "ProfileDescription",
  379. }
  380. for field, xml_name in field_mapping.items():
  381. if field in updates and updates[field] is not None:
  382. new_value = html.escape(updates[field])
  383. # Replace existing metadata or we'd need to add it
  384. pattern = rf'(<metadata\s+name="{xml_name}"[^>]*>)[^<]*(</metadata>)'
  385. replacement = rf'\g<1>{new_value}\g<2>'
  386. content = re.sub(pattern, replacement, content)
  387. # Write to a temporary file first
  388. with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf") as tmp:
  389. tmp_path = Path(tmp.name)
  390. # Create new zip with updated content
  391. with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as zf_write:
  392. for item in zf_read.namelist():
  393. if item == model_path:
  394. zf_write.writestr(item, content.encode("utf-8"))
  395. else:
  396. zf_write.writestr(item, zf_read.read(item))
  397. # Replace original file with updated one
  398. shutil.move(tmp_path, self.file_path)
  399. return True
  400. except Exception:
  401. # Clean up temp file if it exists
  402. if "tmp_path" in locals() and tmp_path.exists():
  403. tmp_path.unlink()
  404. return False
  405. class ArchiveService:
  406. """Service for archiving print jobs."""
  407. def __init__(self, db: AsyncSession):
  408. self.db = db
  409. @staticmethod
  410. def compute_file_hash(file_path: Path) -> str:
  411. """Compute SHA256 hash of a file for duplicate detection."""
  412. sha256 = hashlib.sha256()
  413. with open(file_path, "rb") as f:
  414. # Read in chunks to handle large files
  415. for chunk in iter(lambda: f.read(8192), b""):
  416. sha256.update(chunk)
  417. return sha256.hexdigest()
  418. async def get_duplicate_hashes(self) -> set[str]:
  419. """Get all content hashes that appear more than once.
  420. Returns a set of hashes that have duplicates.
  421. """
  422. from sqlalchemy import func
  423. result = await self.db.execute(
  424. select(PrintArchive.content_hash)
  425. .where(PrintArchive.content_hash.isnot(None))
  426. .group_by(PrintArchive.content_hash)
  427. .having(func.count(PrintArchive.id) > 1)
  428. )
  429. return {row[0] for row in result.all()}
  430. async def find_duplicates(
  431. self,
  432. archive_id: int,
  433. content_hash: str | None = None,
  434. print_name: str | None = None,
  435. makerworld_model_id: str | None = None,
  436. ) -> list[dict]:
  437. """Find duplicate archives based on hash or name matching.
  438. Returns list of dicts with id, print_name, created_at, match_type.
  439. """
  440. duplicates = []
  441. # First, find exact matches by content hash
  442. if content_hash:
  443. result = await self.db.execute(
  444. select(PrintArchive)
  445. .where(
  446. and_(
  447. PrintArchive.content_hash == content_hash,
  448. PrintArchive.id != archive_id,
  449. )
  450. )
  451. .order_by(PrintArchive.created_at.desc())
  452. .limit(10)
  453. )
  454. for archive in result.scalars().all():
  455. duplicates.append({
  456. "id": archive.id,
  457. "print_name": archive.print_name,
  458. "created_at": archive.created_at,
  459. "match_type": "exact",
  460. })
  461. # Then, find similar matches by print name or MakerWorld ID
  462. if print_name or makerworld_model_id:
  463. conditions = [PrintArchive.id != archive_id]
  464. name_conditions = []
  465. if print_name:
  466. # Match if print names are similar (ignoring case)
  467. name_conditions.append(PrintArchive.print_name.ilike(print_name))
  468. if makerworld_model_id:
  469. # Match by MakerWorld model ID stored in extra_data
  470. name_conditions.append(
  471. PrintArchive.extra_data["makerworld_model_id"].astext == makerworld_model_id
  472. )
  473. if name_conditions:
  474. conditions.append(or_(*name_conditions))
  475. result = await self.db.execute(
  476. select(PrintArchive)
  477. .where(and_(*conditions))
  478. .order_by(PrintArchive.created_at.desc())
  479. .limit(10)
  480. )
  481. for archive in result.scalars().all():
  482. # Don't add if already in duplicates (exact match)
  483. if not any(d["id"] == archive.id for d in duplicates):
  484. duplicates.append({
  485. "id": archive.id,
  486. "print_name": archive.print_name,
  487. "created_at": archive.created_at,
  488. "match_type": "similar",
  489. })
  490. return duplicates
  491. async def archive_print(
  492. self,
  493. printer_id: int | None,
  494. source_file: Path,
  495. print_data: dict | None = None,
  496. ) -> PrintArchive | None:
  497. """Archive a 3MF file with metadata."""
  498. # Verify printer exists if specified
  499. if printer_id is not None:
  500. result = await self.db.execute(
  501. select(Printer).where(Printer.id == printer_id)
  502. )
  503. printer = result.scalar_one_or_none()
  504. if not printer:
  505. return None
  506. # Create archive directory structure
  507. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  508. archive_name = f"{timestamp}_{source_file.stem}"
  509. # Use "unassigned" folder for archives without a printer
  510. printer_folder = str(printer_id) if printer_id is not None else "unassigned"
  511. archive_dir = settings.archive_dir / printer_folder / archive_name
  512. archive_dir.mkdir(parents=True, exist_ok=True)
  513. # Copy 3MF file
  514. dest_file = archive_dir / source_file.name
  515. shutil.copy2(source_file, dest_file)
  516. # Compute content hash for duplicate detection
  517. content_hash = self.compute_file_hash(dest_file)
  518. # Parse 3MF metadata
  519. parser = ThreeMFParser(dest_file)
  520. metadata = parser.parse()
  521. # Save thumbnail if present
  522. thumbnail_path = None
  523. if "_thumbnail_data" in metadata:
  524. thumb_file = archive_dir / f"thumbnail{metadata['_thumbnail_ext']}"
  525. thumb_file.write_bytes(metadata["_thumbnail_data"])
  526. thumbnail_path = str(thumb_file.relative_to(settings.base_dir))
  527. del metadata["_thumbnail_data"]
  528. del metadata["_thumbnail_ext"]
  529. # Merge with print data from MQTT
  530. if print_data:
  531. metadata["_print_data"] = print_data
  532. # Determine status and timestamps
  533. status = print_data.get("status", "completed") if print_data else "archived"
  534. started_at = datetime.now() if status == "printing" else None
  535. completed_at = datetime.now() if status in ("completed", "failed", "archived") else None
  536. # Create archive record
  537. archive = PrintArchive(
  538. printer_id=printer_id,
  539. filename=source_file.name,
  540. file_path=str(dest_file.relative_to(settings.base_dir)),
  541. file_size=dest_file.stat().st_size,
  542. content_hash=content_hash,
  543. thumbnail_path=thumbnail_path,
  544. print_name=metadata.get("print_name") or source_file.stem,
  545. print_time_seconds=metadata.get("print_time_seconds"),
  546. filament_used_grams=metadata.get("filament_used_grams"),
  547. filament_type=metadata.get("filament_type"),
  548. filament_color=metadata.get("filament_color"),
  549. layer_height=metadata.get("layer_height"),
  550. nozzle_diameter=metadata.get("nozzle_diameter"),
  551. bed_temperature=metadata.get("bed_temperature"),
  552. nozzle_temperature=metadata.get("nozzle_temperature"),
  553. makerworld_url=metadata.get("makerworld_url"),
  554. designer=metadata.get("designer"),
  555. status=status,
  556. started_at=started_at,
  557. completed_at=completed_at,
  558. extra_data=metadata,
  559. )
  560. self.db.add(archive)
  561. await self.db.commit()
  562. await self.db.refresh(archive)
  563. return archive
  564. async def get_archive(self, archive_id: int) -> PrintArchive | None:
  565. """Get an archive by ID."""
  566. result = await self.db.execute(
  567. select(PrintArchive).where(PrintArchive.id == archive_id)
  568. )
  569. return result.scalar_one_or_none()
  570. async def update_archive_status(
  571. self,
  572. archive_id: int,
  573. status: str,
  574. completed_at: datetime | None = None,
  575. ) -> bool:
  576. """Update the status of an archive."""
  577. archive = await self.get_archive(archive_id)
  578. if not archive:
  579. return False
  580. archive.status = status
  581. if completed_at:
  582. archive.completed_at = completed_at
  583. await self.db.commit()
  584. return True
  585. async def list_archives(
  586. self,
  587. printer_id: int | None = None,
  588. limit: int = 50,
  589. offset: int = 0,
  590. ) -> list[PrintArchive]:
  591. """List archives with optional filtering."""
  592. query = select(PrintArchive).order_by(PrintArchive.created_at.desc())
  593. if printer_id:
  594. query = query.where(PrintArchive.printer_id == printer_id)
  595. query = query.limit(limit).offset(offset)
  596. result = await self.db.execute(query)
  597. return list(result.scalars().all())
  598. async def delete_archive(self, archive_id: int) -> bool:
  599. """Delete an archive and its files."""
  600. archive = await self.get_archive(archive_id)
  601. if not archive:
  602. return False
  603. # Delete files
  604. file_path = settings.base_dir / archive.file_path
  605. if file_path.exists():
  606. archive_dir = file_path.parent
  607. shutil.rmtree(archive_dir, ignore_errors=True)
  608. # Delete database record
  609. await self.db.delete(archive)
  610. await self.db.commit()
  611. return True
  612. async def attach_timelapse(
  613. self,
  614. archive_id: int,
  615. timelapse_data: bytes,
  616. filename: str = "timelapse.mp4",
  617. ) -> bool:
  618. """Attach a timelapse video to an archive."""
  619. archive = await self.get_archive(archive_id)
  620. if not archive:
  621. return False
  622. # Get archive directory
  623. file_path = settings.base_dir / archive.file_path
  624. archive_dir = file_path.parent
  625. # Save timelapse
  626. timelapse_file = archive_dir / filename
  627. timelapse_file.write_bytes(timelapse_data)
  628. # Update archive record
  629. archive.timelapse_path = str(timelapse_file.relative_to(settings.base_dir))
  630. await self.db.commit()
  631. return True