threemf_tools.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. """3MF file parsing utilities for filament tracking.
  2. This module provides functions to parse Bambu Lab 3MF files and extract
  3. per-layer filament usage data from the embedded G-code. This enables
  4. accurate partial usage reporting for multi-material prints.
  5. """
  6. import json
  7. import math
  8. import re
  9. import zipfile
  10. from pathlib import Path
  11. import defusedxml.ElementTree as ET
  12. # Default filament properties
  13. DEFAULT_FILAMENT_DIAMETER = 1.75 # mm
  14. DEFAULT_FILAMENT_DENSITY = 1.24 # g/cm³ (PLA)
  15. def parse_gcode_layer_filament_usage(gcode_content: str) -> dict[int, dict[int, float]]:
  16. """Parse G-code to extract per-layer, per-filament cumulative extrusion in mm.
  17. This function tracks filament extrusion across layers and tool changes,
  18. building a cumulative usage map that can be used to calculate partial
  19. usage at any layer.
  20. Args:
  21. gcode_content: The raw G-code content as a string
  22. Returns:
  23. A nested dictionary mapping layer numbers to filament usage:
  24. {layer: {filament_id: cumulative_mm}, ...}
  25. Example:
  26. {0: {0: 125.5}, 1: {0: 250.0, 1: 50.0}, 2: {0: 375.0, 1: 150.0}}
  27. This shows:
  28. - Layer 0: filament 0 used 125.5mm cumulative
  29. - Layer 1: filament 0 used 250mm cumulative, filament 1 used 50mm
  30. - Layer 2: filament 0 used 375mm cumulative, filament 1 used 150mm
  31. G-code commands parsed:
  32. - M73 L<layer>: Layer change marker
  33. - M620 S<filament>: Filament/tool change (S255 = unload)
  34. - G0/G1/G2/G3 E<amount>: Extrusion moves
  35. """
  36. layer_filaments: dict[int, dict[int, float]] = {}
  37. current_layer = 0
  38. active_filament: int | None = None
  39. cumulative_extrusion: dict[int, float] = {} # filament_id -> total mm
  40. for line in gcode_content.splitlines():
  41. line = line.strip()
  42. if not line:
  43. continue
  44. # Handle comments - skip but check for layer markers
  45. if line.startswith(";"):
  46. # Some slicers use comment-based layer markers
  47. # e.g., "; CHANGE_LAYER" or ";LAYER_CHANGE"
  48. continue
  49. # Split line into command and inline comment
  50. if ";" in line:
  51. line = line.split(";")[0].strip()
  52. # Extract command and parameters
  53. parts = line.split()
  54. if not parts:
  55. continue
  56. cmd = parts[0].upper()
  57. # Layer change: M73 L<layer>
  58. # Bambu printers use M73 with L parameter for layer indication
  59. if cmd == "M73":
  60. for part in parts[1:]:
  61. part_upper = part.upper()
  62. if part_upper.startswith("L"):
  63. try:
  64. new_layer = int(part[1:])
  65. # Save current state before layer change
  66. if cumulative_extrusion:
  67. layer_filaments[current_layer] = cumulative_extrusion.copy()
  68. current_layer = new_layer
  69. except ValueError:
  70. pass # Skip G-code lines with unparseable layer numbers
  71. # Filament change: M620 S<filament>
  72. # Bambu uses M620 for AMS filament switching
  73. # S255 means full unload (no active filament)
  74. elif cmd == "M620":
  75. for part in parts[1:]:
  76. part_upper = part.upper()
  77. if part_upper.startswith("S"):
  78. filament_str = part[1:]
  79. if filament_str == "255":
  80. # Full unload - no active filament
  81. active_filament = None
  82. else:
  83. try:
  84. # Extract digits (e.g., "0A" -> 0, "1" -> 1)
  85. match = re.match(r"(\d+)", filament_str)
  86. if match:
  87. active_filament = int(match.group(1))
  88. except (ValueError, AttributeError):
  89. pass # Skip unparseable filament switch commands
  90. # Extrusion moves: G0/G1/G2/G3 with E parameter
  91. # Only G1 typically has extrusion, but check all for safety
  92. elif cmd in ("G0", "G1", "G2", "G3"):
  93. if active_filament is None:
  94. continue
  95. for part in parts[1:]:
  96. part_upper = part.upper()
  97. if part_upper.startswith("E"):
  98. try:
  99. extrusion = float(part[1:])
  100. # Only count positive extrusion (not retractions)
  101. if extrusion > 0:
  102. current = cumulative_extrusion.get(active_filament, 0)
  103. cumulative_extrusion[active_filament] = current + extrusion
  104. except ValueError:
  105. pass # Skip G-code lines with unparseable extrusion values
  106. # Save final layer state
  107. if cumulative_extrusion:
  108. layer_filaments[current_layer] = cumulative_extrusion.copy()
  109. return layer_filaments
  110. def mm_to_grams(
  111. length_mm: float,
  112. diameter_mm: float = DEFAULT_FILAMENT_DIAMETER,
  113. density_g_cm3: float = DEFAULT_FILAMENT_DENSITY,
  114. ) -> float:
  115. """Convert filament length in mm to weight in grams.
  116. Uses the formula: mass = volume × density
  117. where volume = π × r² × length
  118. Args:
  119. length_mm: Length of filament in millimeters
  120. diameter_mm: Filament diameter in millimeters (default: 1.75)
  121. density_g_cm3: Material density in g/cm³ (default: 1.24 for PLA)
  122. Returns:
  123. Weight in grams
  124. """
  125. radius_cm = (diameter_mm / 2) / 10 # Convert mm to cm
  126. length_cm = length_mm / 10 # Convert mm to cm
  127. volume_cm3 = math.pi * radius_cm * radius_cm * length_cm
  128. return volume_cm3 * density_g_cm3
  129. def extract_layer_filament_usage_from_3mf(file_path: Path) -> dict[int, dict[int, float]] | None:
  130. """Extract per-layer filament usage from a 3MF file's embedded G-code.
  131. Args:
  132. file_path: Path to the 3MF file
  133. Returns:
  134. Dictionary mapping layers to filament usage, or None if parsing fails.
  135. Format: {layer: {filament_id: cumulative_mm}, ...}
  136. """
  137. try:
  138. with zipfile.ZipFile(file_path, "r") as zf:
  139. # Find G-code file(s) - usually plate_1.gcode or Metadata/plate_1.gcode
  140. gcode_files = [f for f in zf.namelist() if f.endswith(".gcode")]
  141. if not gcode_files:
  142. return None
  143. # Use the first G-code file (typically only one per 3MF export)
  144. gcode_path = gcode_files[0]
  145. gcode_content = zf.read(gcode_path).decode("utf-8", errors="ignore")
  146. return parse_gcode_layer_filament_usage(gcode_content)
  147. except Exception:
  148. return None
  149. def get_cumulative_usage_at_layer(
  150. layer_usage: dict[int, dict[int, float]],
  151. target_layer: int,
  152. ) -> dict[int, float]:
  153. """Get cumulative filament usage (in mm) up to and including target_layer.
  154. Args:
  155. layer_usage: The output from parse_gcode_layer_filament_usage()
  156. target_layer: The layer number to get usage for
  157. Returns:
  158. Dictionary of {filament_id: cumulative_mm} for each filament used
  159. up to target_layer. Returns empty dict if no data available.
  160. """
  161. if not layer_usage:
  162. return {}
  163. # Find the highest recorded layer <= target_layer
  164. # (we store snapshots at layer changes, so we need the closest one)
  165. relevant_layers = [layer for layer in layer_usage if layer <= target_layer]
  166. if not relevant_layers:
  167. return {}
  168. max_layer = max(relevant_layers)
  169. return layer_usage.get(max_layer, {})
  170. def extract_filament_properties_from_3mf(file_path: Path) -> dict[int, dict]:
  171. """Extract filament properties (density, diameter, type) from 3MF metadata.
  172. Args:
  173. file_path: Path to the 3MF file
  174. Returns:
  175. Dictionary mapping filament IDs to their properties:
  176. {filament_id: {"diameter": 1.75, "density": 1.24, "type": "PLA"}, ...}
  177. Note: filament_id is 1-based (matches slot_id in slice_info.config)
  178. """
  179. properties: dict[int, dict] = {}
  180. try:
  181. with zipfile.ZipFile(file_path, "r") as zf:
  182. # Try slice_info.config first for filament types
  183. if "Metadata/slice_info.config" in zf.namelist():
  184. content = zf.read("Metadata/slice_info.config").decode()
  185. root = ET.fromstring(content)
  186. for f in root.findall(".//filament"):
  187. try:
  188. # id is 1-based in slice_info.config
  189. fid = int(f.get("id", 0))
  190. properties[fid] = {
  191. "type": f.get("type", "PLA"),
  192. "diameter": DEFAULT_FILAMENT_DIAMETER,
  193. "density": DEFAULT_FILAMENT_DENSITY,
  194. }
  195. except ValueError:
  196. pass # Skip filament entries with unparseable IDs
  197. # Try project_settings.config for density values
  198. if "Metadata/project_settings.config" in zf.namelist():
  199. content = zf.read("Metadata/project_settings.config").decode()
  200. try:
  201. data = json.loads(content)
  202. densities = data.get("filament_density", [])
  203. for i, density in enumerate(densities):
  204. # project_settings uses 0-based indexing, convert to 1-based
  205. fid = i + 1
  206. if fid not in properties:
  207. properties[fid] = {
  208. "type": "",
  209. "diameter": DEFAULT_FILAMENT_DIAMETER,
  210. }
  211. try:
  212. properties[fid]["density"] = float(density)
  213. except (ValueError, TypeError):
  214. properties[fid]["density"] = DEFAULT_FILAMENT_DENSITY
  215. except json.JSONDecodeError:
  216. pass # Skip malformed project_settings.config JSON
  217. except Exception:
  218. pass # Return whatever properties were collected before the error
  219. return properties
  220. def extract_nozzle_mapping_from_3mf(zf: zipfile.ZipFile) -> dict[int, int] | None:
  221. """Extract per-slot nozzle/extruder mapping from a 3MF file.
  222. On dual-nozzle printers (H2D, H2D Pro), each filament slot is assigned to a
  223. specific nozzle. The slicer may override user preferences when using "Auto For
  224. Flush" mode, so the actual assignment comes from slice_info.config group_id
  225. attributes, not from the user's filament_nozzle_map preference.
  226. Priority:
  227. 1. group_id on <filament> elements in slice_info.config (actual assignment)
  228. 2. filament_nozzle_map in project_settings.config (user preference fallback)
  229. Both are mapped through physical_extruder_map to get MQTT extruder IDs (0=right, 1=left).
  230. Args:
  231. zf: An open ZipFile of the 3MF archive
  232. Returns:
  233. Dictionary mapping {slot_id: extruder_id} for dual-nozzle files,
  234. or None if single-nozzle, missing data, or parse error.
  235. """
  236. try:
  237. if "Metadata/project_settings.config" not in zf.namelist():
  238. return None
  239. content = zf.read("Metadata/project_settings.config").decode()
  240. data = json.loads(content)
  241. physical_extruder_map = data.get("physical_extruder_map")
  242. if not physical_extruder_map or len(physical_extruder_map) <= 1:
  243. return None # Single-nozzle printer
  244. # Check if only one extruder is active.
  245. # If so, we can skip the mapping and just assign all slots to that extruder.
  246. # extruder_nozzle_stats format: ["Standard#0|High Flow#0", "Standard#1"]
  247. # Each entry = one extruder. Format: <NozzleVolumeType>#<count>[|...]
  248. # #N is the count of physical nozzles of that type (0 = none installed).
  249. # Types: Standard, High Flow, Hybrid, TPU High Flow
  250. active_extruders = []
  251. for stats_str in (data.get("extruder_nozzle_stats") or []):
  252. nozzle_counts = [n.partition("#")[2] for n in stats_str.split("|")]
  253. active_extruders.append(1 if any(c not in ("0", "") for c in nozzle_counts) else 0)
  254. if sum(active_extruders) == 1:
  255. nozzle_mapping: dict[int, int] = {}
  256. active_idx = active_extruders.index(1)
  257. target_extruder = int(physical_extruder_map[active_idx])
  258. if "Metadata/slice_info.config" in zf.namelist():
  259. si_content = zf.read("Metadata/slice_info.config").decode()
  260. si_root = ET.fromstring(si_content)
  261. for filament_elem in si_root.findall(".//filament"):
  262. try:
  263. nozzle_mapping[int(filament_elem.get("id"))] = target_extruder
  264. except (ValueError, TypeError):
  265. pass
  266. return nozzle_mapping or None
  267. # Priority 1: Use group_id from slice_info filament elements.
  268. # This reflects the actual slicer assignment (respects "Auto For Flush").
  269. nozzle_mapping: dict[int, int] = {}
  270. if "Metadata/slice_info.config" in zf.namelist():
  271. si_content = zf.read("Metadata/slice_info.config").decode()
  272. si_root = ET.fromstring(si_content)
  273. for filament_elem in si_root.findall(".//filament"):
  274. group_id_str = filament_elem.get("group_id")
  275. filament_id_str = filament_elem.get("id")
  276. if group_id_str is not None and filament_id_str:
  277. try:
  278. group_id = int(group_id_str)
  279. slot_id = int(filament_id_str)
  280. if group_id < len(physical_extruder_map):
  281. nozzle_mapping[slot_id] = int(physical_extruder_map[group_id])
  282. except (ValueError, TypeError, IndexError):
  283. pass
  284. if nozzle_mapping:
  285. return nozzle_mapping
  286. # Priority 2: Fall back to filament_nozzle_map (user preference).
  287. # This is correct when the user manually assigned nozzles, but may be
  288. # wrong when the slicer overrides via "Auto For Flush".
  289. filament_nozzle_map = data.get("filament_nozzle_map")
  290. if not filament_nozzle_map:
  291. return None
  292. for i, slicer_ext_str in enumerate(filament_nozzle_map):
  293. slot_id = i + 1
  294. try:
  295. slicer_ext = int(slicer_ext_str)
  296. if slicer_ext < len(physical_extruder_map):
  297. nozzle_mapping[slot_id] = int(physical_extruder_map[slicer_ext])
  298. except (ValueError, TypeError, IndexError):
  299. pass
  300. return nozzle_mapping if nozzle_mapping else None
  301. except Exception:
  302. return None
  303. def extract_filament_usage_from_3mf(file_path: Path, plate_id: int | None = None) -> list[dict]:
  304. """Extract per-filament total usage from 3MF slice_info.config.
  305. This extracts the slicer-estimated total usage per filament slot,
  306. not the per-layer breakdown.
  307. Args:
  308. file_path: Path to the 3MF file
  309. plate_id: Optional plate index to filter for (for multi-plate files)
  310. Returns:
  311. List of filament usage dictionaries:
  312. [{"slot_id": 1, "used_g": 50.5, "type": "PLA", "color": "#FF0000"}, ...]
  313. """
  314. filament_usage = []
  315. try:
  316. with zipfile.ZipFile(file_path, "r") as zf:
  317. if "Metadata/slice_info.config" not in zf.namelist():
  318. return []
  319. content = zf.read("Metadata/slice_info.config").decode()
  320. root = ET.fromstring(content)
  321. if plate_id is not None:
  322. # Find the plate element with matching index
  323. for plate_elem in root.findall(".//plate"):
  324. plate_index = None
  325. for meta in plate_elem.findall("metadata"):
  326. if meta.get("key") == "index":
  327. try:
  328. plate_index = int(meta.get("value", "0"))
  329. except ValueError:
  330. pass
  331. break
  332. if plate_index == plate_id:
  333. for f in plate_elem.findall("filament"):
  334. filament_id = f.get("id")
  335. used_g = f.get("used_g", "0")
  336. try:
  337. used_amount = float(used_g)
  338. if filament_id:
  339. filament_usage.append(
  340. {
  341. "slot_id": int(filament_id),
  342. "used_g": used_amount,
  343. "type": f.get("type", ""),
  344. "color": f.get("color", ""),
  345. }
  346. )
  347. except (ValueError, TypeError):
  348. pass
  349. break
  350. else:
  351. # No plate_id specified - extract all filaments
  352. for f in root.findall(".//filament"):
  353. filament_id = f.get("id")
  354. used_g = f.get("used_g", "0")
  355. try:
  356. used_amount = float(used_g)
  357. if filament_id:
  358. filament_usage.append(
  359. {
  360. "slot_id": int(filament_id),
  361. "used_g": used_amount,
  362. "type": f.get("type", ""),
  363. "color": f.get("color", ""),
  364. }
  365. )
  366. except (ValueError, TypeError):
  367. pass # Skip filament entries with unparseable usage values
  368. except Exception:
  369. pass # Return whatever usage data was collected before the error
  370. return filament_usage