threemf_tools.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. """3MF file parsing utilities for filament tracking.
  2. This module provides functions to parse Bambu Lab 3MF files and extract
  3. per-layer filament usage data from the embedded G-code. This enables
  4. accurate partial usage reporting for multi-material prints.
  5. """
  6. import json
  7. import logging
  8. import math
  9. import re
  10. import zipfile
  11. from pathlib import Path
  12. import defusedxml.ElementTree as ET
  13. logger = logging.getLogger(__name__)
  14. # Default filament properties
  15. DEFAULT_FILAMENT_DIAMETER = 1.75 # mm
  16. DEFAULT_FILAMENT_DENSITY = 1.24 # g/cm³ (PLA)
  17. def parse_gcode_layer_filament_usage(gcode_content: str) -> dict[int, dict[int, float]]:
  18. """Parse G-code to extract per-layer, per-filament cumulative extrusion in mm.
  19. This function tracks filament extrusion across layers and tool changes,
  20. building a cumulative usage map that can be used to calculate partial
  21. usage at any layer.
  22. Args:
  23. gcode_content: The raw G-code content as a string
  24. Returns:
  25. A nested dictionary mapping layer numbers to filament usage:
  26. {layer: {filament_id: cumulative_mm}, ...}
  27. Example:
  28. {0: {0: 125.5}, 1: {0: 250.0, 1: 50.0}, 2: {0: 375.0, 1: 150.0}}
  29. This shows:
  30. - Layer 0: filament 0 used 125.5mm cumulative
  31. - Layer 1: filament 0 used 250mm cumulative, filament 1 used 50mm
  32. - Layer 2: filament 0 used 375mm cumulative, filament 1 used 150mm
  33. G-code commands parsed:
  34. - M73 L<layer>: Layer change marker
  35. - M620 S<filament>: Filament/tool change (S255 = unload)
  36. - G0/G1/G2/G3 E<amount>: Extrusion moves
  37. """
  38. layer_filaments: dict[int, dict[int, float]] = {}
  39. current_layer = 0
  40. active_filament: int | None = None
  41. cumulative_extrusion: dict[int, float] = {} # filament_id -> total mm
  42. for line in gcode_content.splitlines():
  43. line = line.strip()
  44. if not line:
  45. continue
  46. # Handle comments - skip but check for layer markers
  47. if line.startswith(";"):
  48. # Some slicers use comment-based layer markers
  49. # e.g., "; CHANGE_LAYER" or ";LAYER_CHANGE"
  50. continue
  51. # Split line into command and inline comment
  52. if ";" in line:
  53. line = line.split(";")[0].strip()
  54. # Extract command and parameters
  55. parts = line.split()
  56. if not parts:
  57. continue
  58. cmd = parts[0].upper()
  59. # Layer change: M73 L<layer>
  60. # Bambu printers use M73 with L parameter for layer indication
  61. if cmd == "M73":
  62. for part in parts[1:]:
  63. part_upper = part.upper()
  64. if part_upper.startswith("L"):
  65. try:
  66. new_layer = int(part[1:])
  67. # Save current state before layer change
  68. if cumulative_extrusion:
  69. layer_filaments[current_layer] = cumulative_extrusion.copy()
  70. current_layer = new_layer
  71. except ValueError:
  72. pass # Skip G-code lines with unparseable layer numbers
  73. # Filament change: M620 S<filament>
  74. # Bambu uses M620 for AMS filament switching
  75. # S255 means full unload (no active filament)
  76. elif cmd == "M620":
  77. for part in parts[1:]:
  78. part_upper = part.upper()
  79. if part_upper.startswith("S"):
  80. filament_str = part[1:]
  81. if filament_str == "255":
  82. # Full unload - no active filament
  83. active_filament = None
  84. else:
  85. try:
  86. # Extract digits (e.g., "0A" -> 0, "1" -> 1)
  87. match = re.match(r"(\d+)", filament_str)
  88. if match:
  89. active_filament = int(match.group(1))
  90. except (ValueError, AttributeError):
  91. pass # Skip unparseable filament switch commands
  92. # Extrusion moves: G0/G1/G2/G3 with E parameter
  93. # Only G1 typically has extrusion, but check all for safety
  94. elif cmd in ("G0", "G1", "G2", "G3"):
  95. if active_filament is None:
  96. continue
  97. for part in parts[1:]:
  98. part_upper = part.upper()
  99. if part_upper.startswith("E"):
  100. try:
  101. extrusion = float(part[1:])
  102. # Only count positive extrusion (not retractions)
  103. if extrusion > 0:
  104. current = cumulative_extrusion.get(active_filament, 0)
  105. cumulative_extrusion[active_filament] = current + extrusion
  106. except ValueError:
  107. pass # Skip G-code lines with unparseable extrusion values
  108. # Save final layer state
  109. if cumulative_extrusion:
  110. layer_filaments[current_layer] = cumulative_extrusion.copy()
  111. return layer_filaments
  112. def mm_to_grams(
  113. length_mm: float,
  114. diameter_mm: float = DEFAULT_FILAMENT_DIAMETER,
  115. density_g_cm3: float = DEFAULT_FILAMENT_DENSITY,
  116. ) -> float:
  117. """Convert filament length in mm to weight in grams.
  118. Uses the formula: mass = volume × density
  119. where volume = π × r² × length
  120. Args:
  121. length_mm: Length of filament in millimeters
  122. diameter_mm: Filament diameter in millimeters (default: 1.75)
  123. density_g_cm3: Material density in g/cm³ (default: 1.24 for PLA)
  124. Returns:
  125. Weight in grams
  126. """
  127. radius_cm = (diameter_mm / 2) / 10 # Convert mm to cm
  128. length_cm = length_mm / 10 # Convert mm to cm
  129. volume_cm3 = math.pi * radius_cm * radius_cm * length_cm
  130. return volume_cm3 * density_g_cm3
  131. def extract_layer_filament_usage_from_3mf(file_path: Path) -> dict[int, dict[int, float]] | None:
  132. """Extract per-layer filament usage from a 3MF file's embedded G-code.
  133. Args:
  134. file_path: Path to the 3MF file
  135. Returns:
  136. Dictionary mapping layers to filament usage, or None if parsing fails.
  137. Format: {layer: {filament_id: cumulative_mm}, ...}
  138. """
  139. try:
  140. with zipfile.ZipFile(file_path, "r") as zf:
  141. # Find G-code file(s) - usually plate_1.gcode or Metadata/plate_1.gcode
  142. gcode_files = [f for f in zf.namelist() if f.endswith(".gcode")]
  143. if not gcode_files:
  144. return None
  145. # Use the first G-code file (typically only one per 3MF export)
  146. gcode_path = gcode_files[0]
  147. gcode_content = zf.read(gcode_path).decode("utf-8", errors="ignore")
  148. return parse_gcode_layer_filament_usage(gcode_content)
  149. except Exception:
  150. return None
  151. def get_cumulative_usage_at_layer(
  152. layer_usage: dict[int, dict[int, float]],
  153. target_layer: int,
  154. ) -> dict[int, float]:
  155. """Get cumulative filament usage (in mm) up to and including target_layer.
  156. Args:
  157. layer_usage: The output from parse_gcode_layer_filament_usage()
  158. target_layer: The layer number to get usage for
  159. Returns:
  160. Dictionary of {filament_id: cumulative_mm} for each filament used
  161. up to target_layer. Returns empty dict if no data available.
  162. """
  163. if not layer_usage:
  164. return {}
  165. # Find the highest recorded layer <= target_layer
  166. # (we store snapshots at layer changes, so we need the closest one)
  167. relevant_layers = [layer for layer in layer_usage if layer <= target_layer]
  168. if not relevant_layers:
  169. return {}
  170. max_layer = max(relevant_layers)
  171. return layer_usage.get(max_layer, {})
  172. def extract_filament_properties_from_3mf(file_path: Path) -> dict[int, dict]:
  173. """Extract filament properties (density, diameter, type) from 3MF metadata.
  174. Args:
  175. file_path: Path to the 3MF file
  176. Returns:
  177. Dictionary mapping filament IDs to their properties:
  178. {filament_id: {"diameter": 1.75, "density": 1.24, "type": "PLA"}, ...}
  179. Note: filament_id is 1-based (matches slot_id in slice_info.config)
  180. """
  181. properties: dict[int, dict] = {}
  182. try:
  183. with zipfile.ZipFile(file_path, "r") as zf:
  184. # Try slice_info.config first for filament types
  185. if "Metadata/slice_info.config" in zf.namelist():
  186. content = zf.read("Metadata/slice_info.config").decode()
  187. root = ET.fromstring(content)
  188. for f in root.findall(".//filament"):
  189. try:
  190. # id is 1-based in slice_info.config
  191. fid = int(f.get("id", 0))
  192. properties[fid] = {
  193. "type": f.get("type", "PLA"),
  194. "diameter": DEFAULT_FILAMENT_DIAMETER,
  195. "density": DEFAULT_FILAMENT_DENSITY,
  196. }
  197. except ValueError:
  198. pass # Skip filament entries with unparseable IDs
  199. # Try project_settings.config for density values
  200. if "Metadata/project_settings.config" in zf.namelist():
  201. content = zf.read("Metadata/project_settings.config").decode()
  202. try:
  203. data = json.loads(content)
  204. densities = data.get("filament_density", [])
  205. for i, density in enumerate(densities):
  206. # project_settings uses 0-based indexing, convert to 1-based
  207. fid = i + 1
  208. if fid not in properties:
  209. properties[fid] = {
  210. "type": "",
  211. "diameter": DEFAULT_FILAMENT_DIAMETER,
  212. }
  213. try:
  214. properties[fid]["density"] = float(density)
  215. except (ValueError, TypeError):
  216. properties[fid]["density"] = DEFAULT_FILAMENT_DENSITY
  217. except json.JSONDecodeError:
  218. pass # Skip malformed project_settings.config JSON
  219. except Exception:
  220. pass # Return whatever properties were collected before the error
  221. return properties
  222. def extract_nozzle_mapping_from_3mf(zf: zipfile.ZipFile) -> dict[int, int] | None:
  223. """Extract per-slot nozzle/extruder mapping from a 3MF file.
  224. On dual-nozzle printers (H2D, H2D Pro), each filament slot is assigned to a
  225. specific nozzle. The slicer may override user preferences when using "Auto For
  226. Flush" mode, so the actual assignment comes from slice_info.config group_id
  227. attributes, not from the user's filament_nozzle_map preference.
  228. Priority:
  229. 1. group_id on <filament> elements in slice_info.config (actual assignment)
  230. 2. filament_nozzle_map in project_settings.config (user preference fallback)
  231. Both are mapped through physical_extruder_map to get MQTT extruder IDs (0=right, 1=left).
  232. Args:
  233. zf: An open ZipFile of the 3MF archive
  234. Returns:
  235. Dictionary mapping {slot_id: extruder_id} for dual-nozzle files,
  236. or None if single-nozzle, missing data, or parse error.
  237. """
  238. try:
  239. if "Metadata/project_settings.config" not in zf.namelist():
  240. return None
  241. content = zf.read("Metadata/project_settings.config").decode()
  242. data = json.loads(content)
  243. physical_extruder_map = data.get("physical_extruder_map")
  244. if not physical_extruder_map or len(physical_extruder_map) <= 1:
  245. return None # Single-nozzle printer
  246. # Check if only one extruder is active.
  247. # If so, we can skip the mapping and just assign all slots to that extruder.
  248. # extruder_nozzle_stats format: ["Standard#0|High Flow#0", "Standard#1"]
  249. # Each entry = one extruder. Format: <NozzleVolumeType>#<count>[|...]
  250. # #N is the count of physical nozzles of that type (0 = none installed).
  251. # Types: Standard, High Flow, Hybrid, TPU High Flow
  252. active_extruders = []
  253. for stats_str in data.get("extruder_nozzle_stats") or []:
  254. nozzle_counts = [n.partition("#")[2] for n in stats_str.split("|")]
  255. active_extruders.append(1 if any(c not in ("0", "") for c in nozzle_counts) else 0)
  256. if sum(active_extruders) == 1:
  257. nozzle_mapping: dict[int, int] = {}
  258. active_idx = active_extruders.index(1)
  259. target_extruder = int(physical_extruder_map[active_idx])
  260. if "Metadata/slice_info.config" in zf.namelist():
  261. si_content = zf.read("Metadata/slice_info.config").decode()
  262. si_root = ET.fromstring(si_content)
  263. for filament_elem in si_root.findall(".//filament"):
  264. try:
  265. nozzle_mapping[int(filament_elem.get("id"))] = target_extruder
  266. except (ValueError, TypeError):
  267. pass
  268. return nozzle_mapping or None
  269. # Priority 1: Use group_id from slice_info filament elements.
  270. # This reflects the actual slicer assignment (respects "Auto For Flush").
  271. nozzle_mapping: dict[int, int] = {}
  272. if "Metadata/slice_info.config" in zf.namelist():
  273. si_content = zf.read("Metadata/slice_info.config").decode()
  274. si_root = ET.fromstring(si_content)
  275. for filament_elem in si_root.findall(".//filament"):
  276. group_id_str = filament_elem.get("group_id")
  277. filament_id_str = filament_elem.get("id")
  278. if group_id_str is not None and filament_id_str:
  279. try:
  280. group_id = int(group_id_str)
  281. slot_id = int(filament_id_str)
  282. if group_id < len(physical_extruder_map):
  283. nozzle_mapping[slot_id] = int(physical_extruder_map[group_id])
  284. except (ValueError, TypeError, IndexError):
  285. pass
  286. if nozzle_mapping:
  287. return nozzle_mapping
  288. # Priority 2: Fall back to filament_nozzle_map (user preference).
  289. # This is correct when the user manually assigned nozzles, but may be
  290. # wrong when the slicer overrides via "Auto For Flush".
  291. filament_nozzle_map = data.get("filament_nozzle_map")
  292. if not filament_nozzle_map:
  293. return None
  294. for i, slicer_ext_str in enumerate(filament_nozzle_map):
  295. slot_id = i + 1
  296. try:
  297. slicer_ext = int(slicer_ext_str)
  298. if slicer_ext < len(physical_extruder_map):
  299. nozzle_mapping[slot_id] = int(physical_extruder_map[slicer_ext])
  300. except (ValueError, TypeError, IndexError):
  301. pass
  302. return nozzle_mapping if nozzle_mapping else None
  303. except Exception:
  304. return None
  305. def extract_filament_usage_from_3mf(file_path: Path, plate_id: int | None = None) -> list[dict]:
  306. """Extract per-filament total usage from 3MF slice_info.config.
  307. This extracts the slicer-estimated total usage per filament slot,
  308. not the per-layer breakdown.
  309. Args:
  310. file_path: Path to the 3MF file
  311. plate_id: Optional plate index to filter for (for multi-plate files)
  312. Returns:
  313. List of filament usage dictionaries:
  314. [{"slot_id": 1, "used_g": 50.5, "type": "PLA", "color": "#FF0000"}, ...]
  315. """
  316. filament_usage = []
  317. try:
  318. with zipfile.ZipFile(file_path, "r") as zf:
  319. if "Metadata/slice_info.config" not in zf.namelist():
  320. return []
  321. content = zf.read("Metadata/slice_info.config").decode()
  322. root = ET.fromstring(content)
  323. if plate_id is not None:
  324. # Find the plate element with matching index
  325. for plate_elem in root.findall(".//plate"):
  326. plate_index = None
  327. for meta in plate_elem.findall("metadata"):
  328. if meta.get("key") == "index":
  329. try:
  330. plate_index = int(meta.get("value", "0"))
  331. except ValueError:
  332. pass
  333. break
  334. if plate_index == plate_id:
  335. for f in plate_elem.findall("filament"):
  336. filament_id = f.get("id")
  337. used_g = f.get("used_g", "0")
  338. try:
  339. used_amount = float(used_g)
  340. if filament_id:
  341. filament_usage.append(
  342. {
  343. "slot_id": int(filament_id),
  344. "used_g": used_amount,
  345. "type": f.get("type", ""),
  346. "color": f.get("color", ""),
  347. }
  348. )
  349. except (ValueError, TypeError):
  350. pass
  351. break
  352. else:
  353. # No plate_id specified - extract all filaments
  354. for f in root.findall(".//filament"):
  355. filament_id = f.get("id")
  356. used_g = f.get("used_g", "0")
  357. try:
  358. used_amount = float(used_g)
  359. if filament_id:
  360. filament_usage.append(
  361. {
  362. "slot_id": int(filament_id),
  363. "used_g": used_amount,
  364. "type": f.get("type", ""),
  365. "color": f.get("color", ""),
  366. }
  367. )
  368. except (ValueError, TypeError):
  369. pass # Skip filament entries with unparseable usage values
  370. except Exception:
  371. pass # Return whatever usage data was collected before the error
  372. return filament_usage
  373. # Header values exposed as `{placeholder}` substitutions inside snippets.
  374. # Aliases let users write Prusa-style names (`{max_layer_z}`) that map onto
  375. # Bambu/Orca header keys (`max_z_height`).
  376. _HEADER_PLACEHOLDER_ALIASES = {
  377. "max_layer_z": "max_z_height",
  378. "max_print_height": "max_z_height",
  379. "total_layers": "total_layer_number",
  380. }
  381. _HEADER_KEY_RE = re.compile(r"^;\s*([^:]+?)\s*:\s*(.+?)\s*$")
  382. _PLACEHOLDER_RE = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}")
  383. _START_GCODE_END_MARKER = "; MACHINE_START_GCODE_END"
  384. def _parse_3mf_gcode_header(content: str) -> dict[str, str]:
  385. """Parse the `; HEADER_BLOCK_START..END` block into a normalised dict.
  386. Keys are lowercased, ` [units]` suffixes stripped, and spaces converted
  387. to underscores so callers can look up `total_layer_number` regardless of
  388. whether the source line is `; total layer number: 80` or
  389. `; total filament length [mm] : 12155.34`.
  390. """
  391. header: dict[str, str] = {}
  392. in_header = False
  393. for raw_line in content.splitlines():
  394. line = raw_line.strip()
  395. if line == "; HEADER_BLOCK_START":
  396. in_header = True
  397. continue
  398. if line == "; HEADER_BLOCK_END":
  399. break
  400. if not in_header:
  401. continue
  402. m = _HEADER_KEY_RE.match(line)
  403. if not m:
  404. continue
  405. key, value = m.group(1), m.group(2)
  406. key = re.sub(r"\s*\[[^\]]*\]\s*$", "", key)
  407. key = key.strip().lower().replace(" ", "_")
  408. header[key] = value
  409. return header
  410. def _substitute_placeholders(snippet: str, header: dict[str, str]) -> str:
  411. """Replace `{var}` placeholders with header values, leaving unknowns intact."""
  412. def repl(m: re.Match) -> str:
  413. name = m.group(1)
  414. value = header.get(name)
  415. if value is None:
  416. alias = _HEADER_PLACEHOLDER_ALIASES.get(name)
  417. if alias is not None:
  418. value = header.get(alias)
  419. if value is None:
  420. logger.warning(
  421. "G-code injection: placeholder {%s} not found in 3MF header; leaving as-is",
  422. name,
  423. )
  424. return m.group(0)
  425. return value
  426. return _PLACEHOLDER_RE.sub(repl, snippet)
  427. def _inject_start_at_marker(content: str, snippet: str) -> str:
  428. """Insert snippet immediately before `; MACHINE_START_GCODE_END`.
  429. The marker sits at the bottom of the printer's startup block — bed heat,
  430. homing, and nozzle prime are already done, so injected snippets land in
  431. the same place a slicer-side custom-start-gcode would. Falls back to
  432. prepending if the marker isn't present (older files / non-Bambu slicers).
  433. """
  434. marker_idx = content.find(_START_GCODE_END_MARKER)
  435. if marker_idx == -1:
  436. logger.warning(
  437. "G-code injection: '%s' not found, prepending start snippet to whole file",
  438. _START_GCODE_END_MARKER,
  439. )
  440. return snippet.rstrip("\n") + "\n" + content
  441. line_start = content.rfind("\n", 0, marker_idx)
  442. line_start = 0 if line_start == -1 else line_start + 1
  443. return content[:line_start] + snippet.rstrip("\n") + "\n" + content[line_start:]
  444. def inject_gcode_into_3mf(
  445. source_path: Path,
  446. plate_id: int,
  447. start_gcode: str | None,
  448. end_gcode: str | None,
  449. ):
  450. """Create a temp copy of a 3MF with G-code injected at start/end.
  451. Snippets support `{placeholder}` substitution against values parsed from
  452. the 3MF G-code header block (e.g. `{max_layer_z}` → `16.00`). Start
  453. snippets are anchored to the `; MACHINE_START_GCODE_END` marker so they
  454. run after the printer's own startup (#422). End snippets are appended
  455. after the last line of the print.
  456. Args:
  457. source_path: Path to the original 3MF file.
  458. plate_id: Plate number (1-indexed) to inject into.
  459. start_gcode: G-code to insert after printer startup, or None.
  460. end_gcode: G-code to append, or None.
  461. Returns:
  462. Path to temp file with injected G-code, or None if injection failed.
  463. Caller is responsible for cleaning up the temp file.
  464. """
  465. import tempfile
  466. if not start_gcode and not end_gcode:
  467. return None
  468. try:
  469. # Find the target gcode file inside the 3MF
  470. with zipfile.ZipFile(source_path, "r") as zf:
  471. all_gcode = [f for f in zf.namelist() if f.endswith(".gcode")]
  472. if not all_gcode:
  473. return None
  474. # Try plate-specific gcode file first
  475. target_gcode = None
  476. plate_pattern = f"plate_{plate_id}.gcode"
  477. for f in all_gcode:
  478. if f.endswith(plate_pattern):
  479. target_gcode = f
  480. break
  481. # Fall back to first gcode file
  482. if target_gcode is None:
  483. target_gcode = all_gcode[0]
  484. # Read and modify gcode content
  485. gcode_content = zf.read(target_gcode).decode("utf-8", errors="ignore")
  486. header = _parse_3mf_gcode_header(gcode_content)
  487. if start_gcode:
  488. resolved = _substitute_placeholders(start_gcode, header)
  489. gcode_content = _inject_start_at_marker(gcode_content, resolved)
  490. if end_gcode:
  491. resolved = _substitute_placeholders(end_gcode, header)
  492. gcode_content = gcode_content.rstrip("\n") + "\n" + resolved + "\n"
  493. # Write modified 3MF to temp file
  494. with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf") as tmp:
  495. tmp_path = Path(tmp.name)
  496. with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as zf_write:
  497. for item in zf.namelist():
  498. info = zf.getinfo(item)
  499. if item == target_gcode:
  500. zf_write.writestr(info, gcode_content.encode("utf-8"))
  501. else:
  502. zf_write.writestr(info, zf.read(item))
  503. return tmp_path
  504. except Exception:
  505. # Clean up temp file on error
  506. if "tmp_path" in locals() and tmp_path.exists():
  507. tmp_path.unlink(missing_ok=True)
  508. return None