| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229 |
- """Automatic filament consumption tracking.
- Captures AMS tray remain% at print start, then computes consumption
- deltas at print complete to update spool weight_used and last_used.
- Primary tracking uses 3MF slicer estimates (precise per-filament data).
- AMS remain% delta is the fallback for trays not covered by 3MF data.
- """
- import json
- import logging
- from dataclasses import dataclass, field
- from datetime import datetime, timezone
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.spool import Spool
- from backend.app.models.spool_assignment import SpoolAssignment
- from backend.app.models.spool_usage_history import SpoolUsageHistory
- logger = logging.getLogger(__name__)
- def _decode_mqtt_mapping(mapping_raw: list | None) -> list[int] | None:
- """Decode MQTT mapping field (snow-encoded) to bambuddy global tray IDs.
- The printer's MQTT mapping field is an array indexed by slicer filament slot
- (0-based). Each value uses snow encoding: ams_hw_id * 256 + local_slot.
- 65535 means unmapped.
- Returns a list of bambuddy global tray IDs (or -1 for unmapped), or None if
- no valid mappings found.
- """
- if not isinstance(mapping_raw, list) or not mapping_raw:
- return None
- result = []
- for value in mapping_raw:
- if not isinstance(value, int) or value >= 65535:
- result.append(-1)
- continue
- ams_hw_id = value >> 8
- slot = value & 0xFF
- if 0 <= ams_hw_id <= 3:
- # Regular AMS: sequential global ID
- result.append(ams_hw_id * 4 + (slot & 0x03))
- elif 128 <= ams_hw_id <= 135:
- # AMS-HT: global ID is the hardware ID (one slot per unit)
- result.append(ams_hw_id)
- elif ams_hw_id in (254, 255):
- # External spool
- result.append(254 if slot != 255 else 255)
- else:
- result.append(-1)
- # Only return if at least one valid mapping exists
- if all(v < 0 for v in result):
- return None
- return result
- def _match_slots_by_color(
- filament_usage: list[dict],
- ams_raw: dict | list | None,
- ) -> list[int] | None:
- """Match 3MF filament slots to AMS trays by color.
- Fallback mapping for printers that don't provide the MQTT mapping field
- or request topic subscription (e.g. A1, A1 Mini, P1S, P2S).
- Compares the 3MF slicer filament color (per slot) against each AMS tray's
- color to find a unique match. Only returns a mapping if every used slot
- matches exactly one tray (no ambiguity).
- Args:
- filament_usage: List of 3MF slot dicts with 'slot_id', 'color', 'type'
- ams_raw: raw_data["ams"] dict or list from printer state
- Returns:
- List of global tray IDs indexed by slicer slot (0-based), or None.
- """
- if not filament_usage or not ams_raw:
- return None
- ams_data = ams_raw.get("ams", []) if isinstance(ams_raw, dict) else ams_raw if isinstance(ams_raw, list) else []
- if not ams_data:
- return None
- # Build map of normalized color → list of global tray IDs
- color_to_trays: dict[str, list[int]] = {}
- for ams_unit in ams_data:
- ams_id = int(ams_unit.get("id", 0))
- for tray in ams_unit.get("tray", []):
- tray_id = int(tray.get("id", 0))
- tray_color = tray.get("tray_color", "")
- tray_type = tray.get("tray_type", "")
- if not tray_color or not tray_type:
- continue
- # Normalize AMS color: strip alpha (last 2 chars), lowercase
- norm = tray_color[:6].lower() if len(tray_color) >= 6 else tray_color.lower()
- if ams_id >= 128:
- global_id = ams_id # AMS-HT
- else:
- global_id = ams_id * 4 + tray_id
- color_to_trays.setdefault(norm, []).append(global_id)
- if not color_to_trays:
- return None
- # Find max slot_id to size the result array
- max_slot = max(u.get("slot_id", 0) for u in filament_usage)
- if max_slot <= 0:
- return None
- result = [-1] * max_slot
- used_trays: set[int] = set()
- for usage in filament_usage:
- slot_id = usage.get("slot_id", 0)
- if slot_id <= 0:
- continue
- slot_color = usage.get("color", "").lstrip("#").lower()
- if len(slot_color) < 6:
- return None # Can't match without a valid color
- slot_color = slot_color[:6] # Strip alpha if present
- candidates = color_to_trays.get(slot_color, [])
- # Filter out trays already claimed by another slot
- available = [t for t in candidates if t not in used_trays]
- if len(available) != 1:
- # Ambiguous (multiple trays with same color) or no match
- return None
- result[slot_id - 1] = available[0]
- used_trays.add(available[0])
- # Only return if at least one valid mapping exists
- if all(v < 0 for v in result):
- return None
- logger.info("[UsageTracker] Color-matched slot_to_tray: %s", result)
- return result
- @dataclass
- class PrintSession:
- printer_id: int
- print_name: str
- started_at: datetime
- tray_remain_start: dict[tuple[int, int], int] = field(default_factory=dict)
- # tray_now at print start (correct value, unlike at completion where it's 255)
- tray_now_at_start: int = -1
- # Snapshot of spool assignments at print start: {(ams_id, tray_id): spool_id}
- # Prevents usage loss when on_ams_change unlinks a spool mid-print
- spool_assignments: dict[tuple[int, int], int] = field(default_factory=dict)
- # AMS mapping from print command (captured at start, needed when auto-archive is off)
- ams_mapping: list[int] | None = None
- # Module-level storage, keyed by printer_id
- _active_sessions: dict[int, PrintSession] = {}
- def _to_epoch_seconds(value: datetime | None) -> float | None:
- """Convert datetime to epoch seconds, assuming UTC for naive values."""
- if value is None:
- return None
- dt = value
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.timestamp()
- async def _resolve_spool_id_for_tray(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- db: AsyncSession,
- spool_assignments_snapshot: dict[tuple[int, int], int] | None = None,
- print_started_at: datetime | None = None,
- ) -> int | None:
- """Resolve spool ID for a tray with safe support for mid-print reassignment.
- Resolution order:
- 1. If snapshot exists and live assignment changed *during this print*, use live spool.
- 2. Otherwise use snapshot spool when available.
- 3. Fall back to live assignment.
- """
- key = (ams_id, tray_id)
- snapshot_spool_id = spool_assignments_snapshot.get(key) if spool_assignments_snapshot else None
- # Backward-compatible fast path: if we have a snapshot but no print-start
- # timestamp, preserve legacy behavior and avoid extra DB lookups.
- if snapshot_spool_id is not None and print_started_at is None:
- return snapshot_spool_id
- result = await db.execute(
- select(SpoolAssignment).where(
- SpoolAssignment.printer_id == printer_id,
- SpoolAssignment.ams_id == ams_id,
- SpoolAssignment.tray_id == tray_id,
- )
- )
- live_assignment = result.scalar_one_or_none()
- if snapshot_spool_id is not None:
- if live_assignment and live_assignment.spool_id != snapshot_spool_id:
- live_created_ts = _to_epoch_seconds(getattr(live_assignment, "created_at", None))
- started_ts = _to_epoch_seconds(print_started_at)
- if live_created_ts is not None and started_ts is not None and live_created_ts >= started_ts:
- logger.info(
- "[UsageTracker] Assignment changed during print for printer %d AMS%d-T%d: snapshot spool %d -> live spool %d",
- printer_id,
- ams_id,
- tray_id,
- snapshot_spool_id,
- live_assignment.spool_id,
- )
- return live_assignment.spool_id
- return snapshot_spool_id
- if live_assignment:
- return live_assignment.spool_id
- return None
- async def on_print_start(printer_id: int, data: dict, printer_manager, db: AsyncSession | None = None) -> None:
- """Capture AMS tray remain% and spool assignments at print start."""
- state = printer_manager.get_status(printer_id)
- if not state or not state.raw_data:
- logger.debug("[UsageTracker] No state for printer %d, skipping", printer_id)
- return
- ams_raw = state.raw_data.get("ams", [])
- ams_data = ams_raw.get("ams", []) if isinstance(ams_raw, dict) else ams_raw if isinstance(ams_raw, list) else []
- tray_remain_start: dict[tuple[int, int], int] = {}
- skipped_invalid: list[str] = []
- for ams_unit in ams_data:
- ams_id = int(ams_unit.get("id", 0))
- for tray in ams_unit.get("tray", []):
- tray_id = int(tray.get("id", 0))
- remain = tray.get("remain", -1)
- if isinstance(remain, int) and 0 <= remain <= 100:
- tray_remain_start[(ams_id, tray_id)] = remain
- else:
- skipped_invalid.append(f"AMS{ams_id}-T{tray_id}(remain={remain})")
- # Also capture VT (external) tray remain% — these are separate from AMS units
- vt_tray_raw = state.raw_data.get("vt_tray") or []
- if isinstance(vt_tray_raw, dict):
- vt_tray_raw = [vt_tray_raw]
- for vt in vt_tray_raw:
- if not isinstance(vt, dict):
- continue
- vt_id = int(vt.get("id", 254))
- # VT tray id 254 → (ams_id=255, tray_id=0), id 255 → (ams_id=255, tray_id=1)
- vt_tray_id = vt_id - 254
- remain = vt.get("remain", -1)
- if isinstance(remain, int) and 0 <= remain <= 100:
- tray_remain_start[(255, vt_tray_id)] = remain
- else:
- skipped_invalid.append(f"VT{vt_id}(remain={remain})")
- if skipped_invalid:
- logger.info(
- "[UsageTracker] Skipped trays with invalid remain%% for printer %d: %s",
- printer_id,
- ", ".join(skipped_invalid),
- )
- if not ams_data and not vt_tray_raw:
- logger.debug("[UsageTracker] No AMS or VT tray data for printer %d, skipping", printer_id)
- return
- print_name = data.get("subtask_name", "") or data.get("filename", "unknown")
- # Capture tray_now at print start (reliable, unlike at completion where it's 255)
- tray_now_at_start = state.tray_now if state else -1
- # --- Diagnostic logging: dump mapping-related MQTT fields at print start ---
- # This helps us understand what each printer model reports for slot-to-tray mapping.
- mapping_field = state.raw_data.get("mapping")
- logger.info(
- "[UsageTracker] PRINT START printer %d: mapping=%s, tray_now=%d, last_loaded_tray=%s",
- printer_id,
- mapping_field,
- tray_now_at_start,
- getattr(state, "last_loaded_tray", "N/A"),
- )
- # Log all raw_data keys containing "map" or "ams" for discovery
- map_keys = {k: state.raw_data[k] for k in state.raw_data if "map" in k.lower()}
- if map_keys:
- logger.info("[UsageTracker] PRINT START printer %d: mapping-related keys: %s", printer_id, map_keys)
- # Log per-tray summary: tray_now, tray_tar, tray_type, tray_color for each slot
- for ams_unit in ams_data:
- ams_id = int(ams_unit.get("id", 0))
- tray_summary = []
- for tray in ams_unit.get("tray", []):
- tray_summary.append(
- f"T{tray.get('id', '?')}(type={tray.get('tray_type', '')}, "
- f"color={tray.get('tray_color', '')}, "
- f"now={ams_raw.get('tray_now', '?') if isinstance(ams_raw, dict) else '?'}, "
- f"tar={ams_raw.get('tray_tar', '?') if isinstance(ams_raw, dict) else '?'})"
- )
- logger.info("[UsageTracker] PRINT START printer %d AMS %d: %s", printer_id, ams_id, ", ".join(tray_summary))
- # Snapshot spool assignments so usage isn't lost if on_ams_change unlinks mid-print
- spool_assignments: dict[tuple[int, int], int] = {}
- if db:
- assign_result = await db.execute(select(SpoolAssignment).where(SpoolAssignment.printer_id == printer_id))
- for assignment in assign_result.scalars().all():
- spool_assignments[(assignment.ams_id, assignment.tray_id)] = assignment.spool_id
- if spool_assignments:
- logger.info(
- "[UsageTracker] Snapshotted %d spool assignments for printer %d: %s",
- len(spool_assignments),
- printer_id,
- {f"{k[0]}-{k[1]}": v for k, v in spool_assignments.items()},
- )
- # Always create session (even without valid remain data) so print_name
- # is available at completion for 3MF-based tracking
- session = PrintSession(
- printer_id=printer_id,
- print_name=print_name,
- started_at=datetime.now(timezone.utc),
- tray_remain_start=tray_remain_start,
- tray_now_at_start=tray_now_at_start,
- spool_assignments=spool_assignments,
- ams_mapping=data.get("ams_mapping"),
- )
- _active_sessions[printer_id] = session
- if tray_remain_start:
- logger.info(
- "[UsageTracker] Captured start remain%% for printer %d (%d trays): %s",
- printer_id,
- len(tray_remain_start),
- {f"{k[0]}-{k[1]}": v for k, v in tray_remain_start.items()},
- )
- else:
- logger.debug("[UsageTracker] No valid remain%% for printer %d, 3MF fallback available", printer_id)
- async def on_print_complete(
- printer_id: int,
- data: dict,
- printer_manager,
- db: AsyncSession,
- archive_id: int | None = None,
- ams_mapping: list[int] | None = None,
- ) -> list[dict]:
- """Compute consumption deltas and update spool weight_used/last_used.
- Uses two tracking strategies in priority order:
- 1. 3MF per-filament estimates (primary) — precise slicer data for all spools
- 2. AMS remain% delta (fallback) — only for trays not already handled by 3MF
- Returns a list of dicts describing what was logged (for WebSocket broadcast).
- """
- from sqlalchemy import select
- from backend.app.api.routes.settings import get_setting
- from backend.app.models.spool_usage_history import SpoolUsageHistory
- session = _active_sessions.pop(printer_id, None)
- status = data.get("status", "completed")
- results = []
- handled_trays: set[tuple[int, int]] = set()
- # Fetch default filament cost from settings for fallback
- default_cost_str = await get_setting(db, "default_filament_cost")
- default_filament_cost = float(default_cost_str) if default_cost_str else 0.0
- # Fall back to ams_mapping captured at print start (needed when auto-archive is off
- # and the caller can't retrieve the mapping from _print_ams_mappings without archive_id)
- if not ams_mapping and session and session.ams_mapping:
- ams_mapping = session.ams_mapping
- logger.info(
- "[UsageTracker] on_print_complete: printer=%d, archive=%s, session=%s, ams_mapping=%s",
- printer_id,
- archive_id,
- "yes" if session else "no",
- ams_mapping,
- )
- # --- Diagnostic logging: dump mapping-related MQTT fields at print completion ---
- state = printer_manager.get_status(printer_id)
- if state and state.raw_data:
- logger.info(
- "[UsageTracker] PRINT COMPLETE printer %d: mapping=%s, tray_now=%s, last_loaded_tray=%s",
- printer_id,
- state.raw_data.get("mapping"),
- state.tray_now,
- getattr(state, "last_loaded_tray", "N/A"),
- )
- # --- Path 1 (PRIMARY): 3MF per-filament estimates ---
- print_name = (
- (session.print_name if session else None) or data.get("subtask_name", "") or data.get("filename", "unknown")
- )
- # When auto-archive is disabled (archive_id=None), try to find a 3MF by filename
- # from the library or previous archives so we can still track filament usage.
- threemf_path = None
- if not archive_id:
- from backend.app.core.config import settings as app_settings
- search_filename = data.get("filename") or data.get("subtask_name") or (session.print_name if session else "")
- if search_filename:
- threemf_path = await _find_3mf_by_filename(printer_id, search_filename, db, app_settings.base_dir)
- if archive_id or threemf_path:
- threemf_results = await _track_from_3mf(
- printer_id,
- archive_id,
- status,
- print_name,
- handled_trays,
- printer_manager,
- db,
- ams_mapping=ams_mapping,
- tray_now_at_start=session.tray_now_at_start if session else -1,
- last_progress=data.get("last_progress", 0.0),
- last_layer_num=data.get("last_layer_num", 0),
- default_filament_cost=default_filament_cost,
- spool_assignments=session.spool_assignments if session else None,
- print_started_at=session.started_at if session else None,
- threemf_path=threemf_path,
- )
- results.extend(threemf_results)
- # --- Path 2 (FALLBACK): AMS remain% delta (only for trays not handled by 3MF) ---
- if session and session.tray_remain_start:
- state = printer_manager.get_status(printer_id)
- if state and state.raw_data:
- ams_raw = state.raw_data.get("ams", [])
- ams_data = (
- ams_raw.get("ams", []) if isinstance(ams_raw, dict) else ams_raw if isinstance(ams_raw, list) else []
- )
- # Collect all trays to check: AMS trays + VT (external) trays
- # Each entry: (ams_id_for_assignment, tray_id_for_assignment, current_remain, label)
- trays_to_check: list[tuple[int, int, int, str]] = []
- for ams_unit in ams_data:
- ams_id = int(ams_unit.get("id", 0))
- for tray in ams_unit.get("tray", []):
- tray_id = int(tray.get("id", 0))
- remain = tray.get("remain", -1)
- trays_to_check.append((ams_id, tray_id, remain, f"AMS{ams_id}-T{tray_id}"))
- # VT (external) trays — same remain% delta logic
- vt_tray_raw = state.raw_data.get("vt_tray") or []
- if isinstance(vt_tray_raw, dict):
- vt_tray_raw = [vt_tray_raw]
- for vt in vt_tray_raw:
- if not isinstance(vt, dict):
- continue
- vt_id = int(vt.get("id", 254))
- vt_tray_id = vt_id - 254 # 254→0, 255→1
- remain = vt.get("remain", -1)
- trays_to_check.append((255, vt_tray_id, remain, f"VT{vt_id}"))
- for assign_ams_id, assign_tray_id, current_remain, tray_label in trays_to_check:
- key = (assign_ams_id, assign_tray_id)
- if key in handled_trays:
- continue # Already tracked via 3MF
- if key not in session.tray_remain_start:
- continue
- if not isinstance(current_remain, int) or current_remain < 0 or current_remain > 100:
- logger.info(
- "[UsageTracker] %s: invalid remain%% at completion (%s), skipping fallback for printer %d",
- tray_label,
- current_remain,
- printer_id,
- )
- continue
- start_remain = session.tray_remain_start[key]
- delta_pct = start_remain - current_remain
- if delta_pct <= 0:
- continue # No consumption or tray was refilled
- spool_id = await _resolve_spool_id_for_tray(
- printer_id=printer_id,
- ams_id=assign_ams_id,
- tray_id=assign_tray_id,
- db=db,
- spool_assignments_snapshot=session.spool_assignments,
- print_started_at=session.started_at,
- )
- if spool_id is None:
- logger.info(
- "[UsageTracker] %s: no spool assigned, skipping fallback for printer %d",
- tray_label,
- printer_id,
- )
- continue
- # Load spool
- spool_result = await db.execute(select(Spool).where(Spool.id == spool_id))
- spool = spool_result.scalar_one_or_none()
- if not spool:
- continue
- # Compute weight consumed
- weight_grams = (delta_pct / 100.0) * spool.label_weight
- # Update spool
- spool.weight_used = (spool.weight_used or 0) + weight_grams
- spool.last_used = datetime.now(timezone.utc)
- # Calculate cost for this usage
- cost = None
- cost_per_kg = spool.cost_per_kg if spool.cost_per_kg is not None else default_filament_cost
- if cost_per_kg > 0:
- cost = round((weight_grams / 1000.0) * cost_per_kg, 2)
- # Insert usage history record
- history = SpoolUsageHistory(
- spool_id=spool.id,
- printer_id=printer_id,
- print_name=session.print_name,
- weight_used=round(weight_grams, 1),
- percent_used=delta_pct,
- status=status,
- cost=cost,
- archive_id=archive_id,
- )
- db.add(history)
- handled_trays.add(key)
- results.append(
- {
- "spool_id": spool.id,
- "weight_used": round(weight_grams, 1),
- "percent_used": delta_pct,
- "ams_id": assign_ams_id,
- "tray_id": assign_tray_id,
- "material": spool.material,
- "cost": cost,
- }
- )
- logger.info(
- "[UsageTracker] Spool %d consumed %.1fg (%d%%) on printer %d %s (AMS fallback, %s)",
- spool.id,
- weight_grams,
- delta_pct,
- printer_id,
- tray_label,
- status,
- )
- if results:
- await db.commit()
- # --- Update PrintArchive.cost from THIS print session only ---
- if archive_id and results:
- from sqlalchemy import select
- from backend.app.models.archive import PrintArchive
- archive_result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
- archive = archive_result.scalar_one_or_none()
- if archive:
- total_cost = sum(r.get("cost", 0) or 0 for r in results)
- if total_cost > 0:
- archive.cost = round(total_cost, 2)
- await db.commit()
- return results
- async def _resolve_3mf_fallback(archive, db: AsyncSession, base_dir):
- """Try to find a 3MF file from library or a previous archive when the current archive has none.
- This handles fallback archives (FTP download failed) where the 3MF may already exist
- locally from a library upload or a previous successful print of the same file.
- """
- from pathlib import Path
- from backend.app.models.archive import PrintArchive
- from backend.app.models.library import LibraryFile
- # Derive search name from archive filename (e.g. "benchy.3mf" or "benchy.gcode.3mf")
- search_name = archive.filename or archive.print_name
- if not search_name:
- return None
- # Normalize: strip path parts, get base name
- search_name = search_name.split("/")[-1]
- search_base = search_name.replace(".gcode.3mf", "").replace(".gcode", "").replace(".3mf", "")
- if not search_base:
- return None
- # 1. Try library files matching the name (match base name at file boundary)
- try:
- lib_result = await db.execute(
- LibraryFile.active()
- .where(LibraryFile.file_path.ilike(f"%/{search_base}.%") | LibraryFile.file_path.ilike(f"{search_base}.%"))
- .where(LibraryFile.file_path.ilike("%.3mf"))
- .order_by(LibraryFile.created_at.desc())
- .limit(3)
- )
- for lib_file in lib_result.scalars().all():
- lib_path = Path(lib_file.file_path)
- candidate = lib_path if lib_path.is_absolute() else base_dir / lib_file.file_path
- if candidate.exists() and candidate.suffix == ".3mf":
- logger.info("[UsageTracker] 3MF fallback: found library file %s for archive %s", candidate, archive.id)
- return candidate
- except Exception as e:
- logger.debug("[UsageTracker] 3MF fallback: library lookup failed: %s", e)
- # 2. Try previous archives with the same filename that have a valid file_path
- try:
- prev_result = await db.execute(
- select(PrintArchive)
- .where(PrintArchive.id != archive.id)
- .where(PrintArchive.printer_id == archive.printer_id)
- .where(PrintArchive.file_path != "")
- .where(PrintArchive.file_path.isnot(None))
- .where(
- PrintArchive.filename.ilike(f"%{search_base}.%") | PrintArchive.filename.ilike(f"{search_base}.%"),
- )
- .order_by(PrintArchive.created_at.desc())
- .limit(3)
- )
- for prev_archive in prev_result.scalars().all():
- candidate = base_dir / prev_archive.file_path
- if candidate.exists() and candidate.suffix == ".3mf":
- logger.info(
- "[UsageTracker] 3MF fallback: found previous archive %s file for archive %s",
- prev_archive.id,
- archive.id,
- )
- return candidate
- except Exception as e:
- logger.debug("[UsageTracker] 3MF fallback: previous archive lookup failed: %s", e)
- return None
- async def _find_3mf_by_filename(
- printer_id: int,
- filename: str,
- db: AsyncSession,
- base_dir,
- ):
- """Find a 3MF file by filename from library or previous archives.
- Used when auto-archive is disabled and there's no archive_id, but we still
- need the 3MF slicer data for filament usage tracking.
- """
- from pathlib import Path
- from backend.app.models.archive import PrintArchive
- from backend.app.models.library import LibraryFile
- search_name = filename.split("/")[-1] if "/" in filename else filename
- search_base = search_name.replace(".gcode.3mf", "").replace(".gcode", "").replace(".3mf", "")
- if not search_base:
- return None
- # 1. Try library files matching the name
- try:
- lib_result = await db.execute(
- LibraryFile.active()
- .where(LibraryFile.file_path.ilike(f"%/{search_base}.%") | LibraryFile.file_path.ilike(f"{search_base}.%"))
- .where(LibraryFile.file_path.ilike("%.3mf"))
- .order_by(LibraryFile.created_at.desc())
- .limit(3)
- )
- for lib_file in lib_result.scalars().all():
- lib_path = Path(lib_file.file_path)
- candidate = lib_path if lib_path.is_absolute() else base_dir / lib_file.file_path
- if candidate.exists() and candidate.suffix == ".3mf":
- logger.info("[UsageTracker] 3MF (no-archive): found library file %s for '%s'", candidate, filename)
- return candidate
- except Exception as e:
- logger.debug("[UsageTracker] 3MF (no-archive): library lookup failed: %s", e)
- # 2. Try previous archives with a valid 3MF file_path
- try:
- prev_result = await db.execute(
- select(PrintArchive)
- .where(PrintArchive.printer_id == printer_id)
- .where(PrintArchive.file_path != "")
- .where(PrintArchive.file_path.isnot(None))
- .where(
- PrintArchive.filename.ilike(f"%{search_base}.%") | PrintArchive.filename.ilike(f"{search_base}.%"),
- )
- .order_by(PrintArchive.created_at.desc())
- .limit(3)
- )
- for prev_archive in prev_result.scalars().all():
- candidate = base_dir / prev_archive.file_path
- if candidate.exists() and candidate.suffix == ".3mf":
- logger.info(
- "[UsageTracker] 3MF (no-archive): found previous archive %s file for '%s'",
- prev_archive.id,
- filename,
- )
- return candidate
- except Exception as e:
- logger.debug("[UsageTracker] 3MF (no-archive): previous archive lookup failed: %s", e)
- return None
- async def _track_from_3mf(
- printer_id: int,
- archive_id: int | None,
- status: str,
- print_name: str,
- handled_trays: set[tuple[int, int]],
- printer_manager,
- db: AsyncSession,
- ams_mapping: list[int] | None = None,
- tray_now_at_start: int = -1,
- last_progress: float = 0.0,
- last_layer_num: int = 0,
- default_filament_cost: float = 0.0,
- spool_assignments: dict[tuple[int, int], int] | None = None,
- print_started_at: datetime | None = None,
- threemf_path=None,
- ) -> list[dict]:
- """Track usage from 3MF per-filament slicer data (primary path).
- Uses slicer-estimated filament weight for all spools (BL and non-BL).
- For partial prints (failed/aborted), tries per-layer gcode data first,
- then falls back to linear scaling by progress.
- When archive_id is None (auto-archive disabled), a pre-resolved threemf_path
- can be provided to still track filament usage from slicer data.
- Slot-to-tray mapping priority:
- 1. Stored ams_mapping from print command (reprints/direct prints)
- 2. MQTT mapping field from printer state (universal, all print sources)
- 3. Queue item ams_mapping (for queue-initiated prints)
- 4. tray_now from printer state (for single-filament non-queue prints)
- 5. Position-based default using sorted available tray IDs (handles external spools)
- 6. Default mapping: slot_id - 1 = global_tray_id (last resort)
- """
- from pathlib import Path
- from backend.app.core.config import settings as app_settings
- from backend.app.models.archive import PrintArchive
- from backend.app.models.print_queue import PrintQueueItem
- from backend.app.utils.threemf_tools import extract_filament_usage_from_3mf
- file_path: Path | None = threemf_path
- if file_path is None and archive_id:
- result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
- archive = result.scalar_one_or_none()
- if not archive:
- logger.info("[UsageTracker] 3MF: archive %s not found, skipping", archive_id)
- return []
- # Try archive's own file_path first
- if archive.file_path:
- candidate = app_settings.base_dir / archive.file_path
- if candidate.exists():
- file_path = candidate
- # Fallback: find 3MF from library or a previous archive with the same filename
- if file_path is None:
- file_path = await _resolve_3mf_fallback(archive, db, app_settings.base_dir)
- if file_path is None:
- logger.info("[UsageTracker] 3MF: no file available for archive %s, skipping", archive_id)
- return []
- filament_usage = extract_filament_usage_from_3mf(file_path)
- if not filament_usage:
- logger.info("[UsageTracker] 3MF: no filament usage data in %s", file_path)
- return []
- logger.info("[UsageTracker] 3MF: archive %s, filament_usage=%s", archive_id, filament_usage)
- # --- Resolve slot-to-tray mapping ---
- mapping_source = None
- # 1. Use stored ams_mapping from the print command (reprints/direct prints)
- slot_to_tray = ams_mapping
- if slot_to_tray:
- mapping_source = "print_cmd"
- # 2. Try MQTT mapping field from printer state (universal, all print sources)
- if not slot_to_tray:
- state = printer_manager.get_status(printer_id)
- raw_data = getattr(state, "raw_data", None) if state else None
- if raw_data:
- mqtt_mapping = raw_data.get("mapping")
- decoded = _decode_mqtt_mapping(mqtt_mapping)
- if decoded:
- slot_to_tray = decoded
- mapping_source = "mqtt"
- # 3. Try queue item ams_mapping (queue-initiated prints store the exact mapping)
- if not slot_to_tray and archive_id:
- queue_result = await db.execute(
- select(PrintQueueItem)
- .where(PrintQueueItem.archive_id == archive_id)
- .where(PrintQueueItem.status.in_(["printing", "completed", "failed"]))
- )
- queue_item = queue_result.scalar_one_or_none()
- if queue_item and queue_item.ams_mapping:
- try:
- slot_to_tray = json.loads(queue_item.ams_mapping)
- mapping_source = "queue"
- except (json.JSONDecodeError, TypeError):
- pass
- # 4. Color-match 3MF filament slots to AMS trays (for printers without mapping field)
- if not slot_to_tray:
- state = printer_manager.get_status(printer_id)
- raw_data = getattr(state, "raw_data", None) if state else None
- if raw_data:
- matched = _match_slots_by_color(filament_usage, raw_data.get("ams"))
- if matched:
- slot_to_tray = matched
- mapping_source = "color_match"
- logger.info(
- "[UsageTracker] 3MF: slot_to_tray=%s (source: %s)",
- slot_to_tray,
- mapping_source or "none",
- )
- # 5. For single-filament non-queue prints, use tray_now from printer state
- # Priority: tray_change_log (multi-tray split) > tray_now_at_start > current tray_now
- # > last_loaded_tray > vt_tray check
- #
- # tray_change_log evidence wins over slot_to_tray when present: if the
- # printer fed from multiple trays mid-print (AMS auto-fallback when one
- # spool runs out, #957), the slicer's mapping captured at print start
- # is stale and needs to be replaced with per-layer split attribution.
- nonzero_slots = [u for u in filament_usage if u.get("used_g", 0) > 0]
- tray_now_override: int | None = None
- tray_changes: list[tuple[int, int]] = [] # [(global_tray_id, layer_num), ...]
- state = printer_manager.get_status(printer_id) if len(nonzero_slots) == 1 else None
- if state is not None:
- tray_changes = getattr(state, "tray_change_log", []) or []
- if len(tray_changes) > 1:
- # Multi-tray usage detected — splitting takes over regardless of slot_to_tray.
- logger.info("[UsageTracker] 3MF: tray change log: %s (will split weight)", tray_changes)
- elif not slot_to_tray and len(nonzero_slots) == 1:
- if 0 <= tray_now_at_start <= 254:
- tray_now_override = tray_now_at_start
- logger.info("[UsageTracker] 3MF: using tray_now_at_start=%d (single-filament fallback)", tray_now_at_start)
- elif state and 0 <= state.tray_now <= 254:
- tray_now_override = state.tray_now
- logger.info("[UsageTracker] 3MF: using current tray_now=%d", state.tray_now)
- elif state and 0 <= state.last_loaded_tray <= 253:
- tray_now_override = state.last_loaded_tray
- logger.info("[UsageTracker] 3MF: using last_loaded_tray=%d (post-retract fallback)", state.last_loaded_tray)
- elif state and state.tray_now == 255:
- # 255 = "no filament" on legacy printers, but valid 2nd external spool on H2-series
- vt_tray = state.raw_data.get("vt_tray") or []
- if any(int(vt.get("id", 0)) == 255 for vt in vt_tray if isinstance(vt, dict)):
- tray_now_override = state.tray_now
- logger.info("[UsageTracker] 3MF: using tray_now=255 (H2-series external spool)")
- if tray_now_override is None:
- logger.info(
- "[UsageTracker] 3MF: no valid tray_now (at_start=%d, current=%s, last_loaded=%s)",
- tray_now_at_start,
- state.tray_now if state else "N/A",
- state.last_loaded_tray if state else "N/A",
- )
- # Scale factor for partial prints (failed/aborted)
- if status == "completed":
- scale = 1.0
- else:
- state = printer_manager.get_status(printer_id)
- progress = state.progress if state else 0
- # Firmware resets progress to 0 on cancel — use last valid progress captured during print
- if progress <= 0 and last_progress > 0:
- progress = last_progress
- logger.info("[UsageTracker] 3MF: using last_progress=%.1f (firmware reset current to 0)", last_progress)
- scale = max(0.0, min(progress / 100.0, 1.0))
- # Per-layer gcode accuracy for partial prints
- layer_grams: dict[int, float] | None = None
- if status != "completed":
- state = printer_manager.get_status(printer_id)
- current_layer = state.layer_num if state else 0
- # Firmware resets layer_num to 0 on cancel — use last valid layer captured during print
- if current_layer <= 0 and last_layer_num > 0:
- current_layer = last_layer_num
- logger.info("[UsageTracker] 3MF: using last_layer_num=%d (firmware reset current to 0)", last_layer_num)
- if current_layer > 0:
- try:
- from backend.app.utils.threemf_tools import (
- extract_filament_properties_from_3mf,
- extract_layer_filament_usage_from_3mf,
- get_cumulative_usage_at_layer,
- mm_to_grams,
- )
- layer_usage = extract_layer_filament_usage_from_3mf(file_path)
- if layer_usage:
- cumulative_mm = get_cumulative_usage_at_layer(layer_usage, current_layer)
- filament_props = extract_filament_properties_from_3mf(file_path)
- layer_grams = {}
- for filament_id, mm_used in cumulative_mm.items():
- slot_id = filament_id + 1 # 0-based to 1-based
- props = filament_props.get(slot_id, {})
- density = props.get("density", 1.24)
- diameter = props.get("diameter", 1.75)
- layer_grams[slot_id] = mm_to_grams(mm_used, diameter, density)
- except Exception:
- pass # Fall back to linear scaling
- results = []
- for usage in filament_usage:
- slot_id = usage.get("slot_id", 0)
- used_g = usage.get("used_g", 0)
- if used_g <= 0:
- continue
- # --- Mid-print tray switch: split weight across trays ---
- if len(tray_changes) > 1:
- # Compute total weight for this slot (same logic as normal path)
- if layer_grams and slot_id in layer_grams:
- total_weight = layer_grams[slot_id]
- else:
- total_weight = used_g * scale
- if total_weight <= 0:
- continue
- # Extract per-layer gcode for segment splitting
- split_layer_usage = None
- split_props: dict = {}
- try:
- from backend.app.utils.threemf_tools import (
- extract_filament_properties_from_3mf,
- extract_layer_filament_usage_from_3mf,
- get_cumulative_usage_at_layer,
- mm_to_grams,
- )
- split_layer_usage = extract_layer_filament_usage_from_3mf(file_path)
- filament_props = extract_filament_properties_from_3mf(file_path)
- split_props = filament_props.get(slot_id, {})
- except Exception:
- pass # Fall back to linear splitting
- density = split_props.get("density", 1.24)
- diameter = split_props.get("diameter", 1.75)
- filament_id = slot_id - 1 # 0-based for gcode
- sum_previous = 0.0
- for seg_idx, (tray_global, seg_start_layer) in enumerate(tray_changes):
- is_last = seg_idx + 1 >= len(tray_changes)
- if is_last:
- # Last segment: remainder to avoid rounding drift
- segment_grams = total_weight - sum_previous
- elif split_layer_usage:
- seg_end_layer = tray_changes[seg_idx + 1][1]
- mm_at_start = get_cumulative_usage_at_layer(split_layer_usage, seg_start_layer).get(filament_id, 0)
- mm_at_end = get_cumulative_usage_at_layer(split_layer_usage, seg_end_layer).get(filament_id, 0)
- segment_grams = mm_to_grams(mm_at_end - mm_at_start, diameter, density)
- else:
- # No per-layer data: linear fallback by layer ratio
- seg_end_layer = tray_changes[seg_idx + 1][1]
- total_layers = state.total_layers if state else 0
- if total_layers > 0:
- segment_grams = total_weight * (seg_end_layer - seg_start_layer) / total_layers
- else:
- # Can't compute ratio — assign all to last segment
- segment_grams = 0.0
- sum_previous += segment_grams
- if segment_grams <= 0:
- continue
- # Convert global tray ID to (ams_id, tray_id)
- if tray_global >= 254:
- seg_ams_id = 255
- seg_tray_id = tray_global - 254
- elif tray_global >= 128:
- seg_ams_id = tray_global
- seg_tray_id = 0
- else:
- seg_ams_id = tray_global // 4
- seg_tray_id = tray_global % 4
- seg_key = (seg_ams_id, seg_tray_id)
- if seg_key in handled_trays:
- continue
- logger.info(
- "[UsageTracker] 3MF split: segment %d tray=%d (AMS%d-T%d) layers %d-%s -> %.1fg",
- seg_idx,
- tray_global,
- seg_ams_id,
- seg_tray_id,
- seg_start_layer,
- tray_changes[seg_idx + 1][1] if not is_last else "end",
- segment_grams,
- )
- seg_spool_id = await _resolve_spool_id_for_tray(
- printer_id=printer_id,
- ams_id=seg_ams_id,
- tray_id=seg_tray_id,
- db=db,
- spool_assignments_snapshot=spool_assignments,
- print_started_at=print_started_at,
- )
- if seg_spool_id is None:
- logger.info(
- "[UsageTracker] 3MF split: no spool at printer %d AMS%d-T%d, skipping segment",
- printer_id,
- seg_ams_id,
- seg_tray_id,
- )
- continue
- spool_result = await db.execute(select(Spool).where(Spool.id == seg_spool_id))
- spool = spool_result.scalar_one_or_none()
- if not spool:
- continue
- spool.weight_used = (spool.weight_used or 0) + segment_grams
- spool.last_used = datetime.now(timezone.utc)
- percent = round(segment_grams / (spool.label_weight or 1000) * 100)
- cost = None
- cost_per_kg = spool.cost_per_kg if spool.cost_per_kg is not None else default_filament_cost
- if cost_per_kg > 0:
- cost = round((segment_grams / 1000.0) * cost_per_kg, 2)
- history = SpoolUsageHistory(
- spool_id=spool.id,
- printer_id=printer_id,
- print_name=print_name,
- weight_used=round(segment_grams, 1),
- percent_used=percent,
- status=status,
- cost=cost,
- archive_id=archive_id,
- )
- db.add(history)
- handled_trays.add(seg_key)
- results.append(
- {
- "spool_id": spool.id,
- "weight_used": round(segment_grams, 1),
- "percent_used": percent,
- "ams_id": seg_ams_id,
- "tray_id": seg_tray_id,
- "material": spool.material,
- "cost": cost,
- }
- )
- logger.info(
- "[UsageTracker] Spool %d consumed %.1fg (3MF split seg%d) on printer %d AMS%d-T%d (%s)",
- spool.id,
- segment_grams,
- seg_idx,
- printer_id,
- seg_ams_id,
- seg_tray_id,
- status,
- )
- continue # Skip normal single-tray processing for this slot
- # Map 3MF slot_id to physical (ams_id, tray_id) using resolved mapping
- if tray_now_override is not None:
- # Single-filament non-queue print: use actual tray from printer state
- global_tray_id = tray_now_override
- else:
- # Explicit mapping (print command, MQTT, queue, color match)
- global_tray_id = None
- if slot_to_tray and slot_id <= len(slot_to_tray):
- mapped = slot_to_tray[slot_id - 1]
- if isinstance(mapped, int) and mapped >= 0:
- global_tray_id = mapped
- # Position-based default: sort available tray IDs so external spools (254/255)
- # naturally follow standard AMS trays, matching slicer slot numbering
- if global_tray_id is None:
- _state = printer_manager.get_status(printer_id)
- _raw = getattr(_state, "raw_data", None) if _state else None
- if _raw:
- from backend.app.services.spoolman_tracking import build_ams_tray_lookup
- available_trays = sorted(build_ams_tray_lookup(_raw).keys())
- if slot_id <= len(available_trays):
- global_tray_id = available_trays[slot_id - 1]
- # Final fallback: slot_id - 1 (legacy, works for pure AMS without external spools)
- if global_tray_id is None:
- global_tray_id = slot_id - 1
- if global_tray_id >= 254:
- # External spool: ams_id=255 (sentinel), tray_id=slot index (0 or 1)
- ams_id = 255
- tray_id = global_tray_id - 254
- elif global_tray_id >= 128:
- ams_id = global_tray_id
- tray_id = 0
- else:
- ams_id = global_tray_id // 4
- tray_id = global_tray_id % 4
- logger.info(
- "[UsageTracker] 3MF: slot_id=%d -> global_tray=%d -> AMS%d-T%d (used_g=%.1f, tray_now_override=%s)",
- slot_id,
- global_tray_id,
- ams_id,
- tray_id,
- used_g,
- tray_now_override,
- )
- key = (ams_id, tray_id)
- if key in handled_trays:
- continue
- spool_id = await _resolve_spool_id_for_tray(
- printer_id=printer_id,
- ams_id=ams_id,
- tray_id=tray_id,
- db=db,
- spool_assignments_snapshot=spool_assignments,
- print_started_at=print_started_at,
- )
- if spool_id is None:
- logger.info("[UsageTracker] 3MF: no spool assignment at printer %d AMS%d-T%d", printer_id, ams_id, tray_id)
- continue
- # Load spool
- spool_result = await db.execute(select(Spool).where(Spool.id == spool_id))
- spool = spool_result.scalar_one_or_none()
- if not spool:
- continue
- # Use per-layer grams if available, otherwise linear scale
- if layer_grams and slot_id in layer_grams:
- weight_grams = layer_grams[slot_id]
- else:
- weight_grams = used_g * scale
- if weight_grams <= 0:
- continue
- # Update spool
- spool.weight_used = (spool.weight_used or 0) + weight_grams
- spool.last_used = datetime.now(timezone.utc)
- percent = round(weight_grams / (spool.label_weight or 1000) * 100)
- # Calculate cost for this usage
- cost = None
- cost_per_kg = spool.cost_per_kg if spool.cost_per_kg is not None else default_filament_cost
- if cost_per_kg > 0:
- cost = round((weight_grams / 1000.0) * cost_per_kg, 2)
- # Insert usage history record
- history = SpoolUsageHistory(
- spool_id=spool.id,
- printer_id=printer_id,
- print_name=print_name,
- weight_used=round(weight_grams, 1),
- percent_used=percent,
- status=status,
- cost=cost,
- archive_id=archive_id,
- )
- db.add(history)
- handled_trays.add(key)
- results.append(
- {
- "spool_id": spool.id,
- "weight_used": round(weight_grams, 1),
- "percent_used": percent,
- "ams_id": ams_id,
- "tray_id": tray_id,
- "material": spool.material,
- "cost": cost,
- }
- )
- # Determine mapping source for debug logging
- if tray_now_override is not None:
- map_src = ", tray_now"
- elif mapping_source:
- map_src = f", {mapping_source}_map"
- else:
- map_src = ""
- logger.info(
- "[UsageTracker] Spool %d consumed %.1fg (3MF%s%s) on printer %d AMS%d-T%d (%s)",
- spool.id,
- weight_grams,
- " per-layer" if (layer_grams and slot_id in layer_grams) else (f" scaled {scale:.0%}" if scale < 1 else ""),
- map_src,
- printer_id,
- ams_id,
- tray_id,
- status,
- )
- return results
|