Browse Source

Merge branch 'maziggy:0.2.0b' into 0.2.0b

Keybored 3 months ago
parent
commit
d7e1b480dc

+ 2 - 1
CHANGELOG.md

@@ -7,7 +7,8 @@ All notable changes to Bambuddy will be documented in this file.
 ### New Features
 - **Spool Inventory — AMS Slot Assignment** — Assign inventory spools to AMS slots for filament tracking. Hover over any non-Bambu-Lab AMS slot to assign or unassign spools. The assign modal filters out Bambu Lab spools (tracked via RFID) and spools already assigned to other slots. Bambu Lab spool slots automatically hide assign/unassign UI since they are managed by the AMS. When a Bambu Lab spool is inserted into a slot with a manual assignment, the assignment is automatically unlinked.
 - **Spool Inventory — Remaining Weight Editing** — Edit the remaining filament weight when adding or editing a spool. The new "Remaining Weight" field in the Additional section shows current weight (label weight minus consumed) with a max reference. Edits are stored as `weight_used` internally.
-- **Spool Inventory — 3MF-Based Usage Tracking for Non-BL Spools** — Non-Bambu-Lab spools (no RFID) cannot use AMS remain% for usage tracking. Now falls back to per-filament weight estimates from the archived 3MF file (`used_g` per filament slot). For completed prints, uses the full slicer estimate. For failed or aborted prints, scales by print progress percentage. Bambu Lab spools continue using AMS remain% delta tracking as before.
+- **Spool Inventory — Unified 3MF-Based Usage Tracking** ([#336](https://github.com/maziggy/bambuddy/issues/336)) — All spools (Bambu Lab and third-party) now use 3MF slicer estimates as the primary tracking source. Per-filament `used_g` data from the archived 3MF file provides precise per-spool consumption. For failed or aborted prints, per-layer G-code analysis provides accurate partial usage up to the exact failure layer, with linear progress scaling as fallback. AMS remain% delta is the final fallback for G-code-only prints without an archived 3MF. Slot-to-tray mapping uses queue `ams_mapping` for queue-initiated prints and the printer's `tray_now` state for single-filament non-queue prints, ensuring the correct physical spool is always tracked.
+- **Notification Templates — Filament Usage Variables** ([#336](https://github.com/maziggy/bambuddy/issues/336)) — `print_complete`, `print_failed`, and `print_stopped` notification events now expose `{filament_grams}` (total grams, scaled by progress for partial prints), `{filament_details}` (per-filament breakdown, e.g. "PLA: 10.0g | PETG: 5.0g"), and `{progress}` (completion percentage for failed/stopped prints). Webhook payloads include `filament_used`, `filament_details`, and `progress` fields. Per-slot filament data is stored in archive `extra_data` for downstream use.
 - **Printer Status Summary Bar — Next Available & Availability Count** ([#354](https://github.com/maziggy/bambuddy/issues/354)) — The status bar on the Printers page now shows an availability count ("X available") alongside the printing/offline counts, and a "Next available" indicator showing which printing printer will finish soonest — with printer name, mini progress bar, completion percentage, and remaining time. Useful for print farms to quickly identify the next free printer. Updates in real-time via WebSocket. Translated in all 4 locales (en, de, ja, it).
 - **Nozzle-Aware AMS Filament Mapping for Dual-Nozzle Printers** ([#318](https://github.com/maziggy/bambuddy/issues/318)) — On dual-nozzle printers (H2D, H2D Pro), each AMS unit is physically connected to either the left or right nozzle. Bambuddy now reads nozzle assignments from the 3MF file (`filament_nozzle_map` + `physical_extruder_map` in `project_settings.config`) and constrains filament matching to only AMS trays connected to the correct nozzle via `ams_extruder_map`. Applies to the print scheduler, reprint modal, queue modal, and multi-printer selection. Falls back gracefully to unfiltered matching when no trays exist on the target nozzle. The filament mapping UI shows L/R nozzle badges for dual-nozzle prints. Translated in all 4 locales (en, de, ja, it).
 

+ 4 - 2
README.md

@@ -140,15 +140,17 @@ Perfect for remote print farms, traveling makers, or accessing your home printer
 - Email, Pushover, ntfy
 - Custom webhooks
 - Quiet hours & daily digest
-- Customizable message templates
+- Customizable message templates with per-filament usage details
 - Print finish photo URL in notifications
+- Filament usage and progress in failed/cancelled print notifications
 - HMS error alerts (AMS, nozzle, etc.)
 - Build plate detection alerts
 - Queue events (waiting, skipped, failed)
 
 ### 🧵 Spool Inventory
 - Built-in spool inventory with AMS slot assignment, usage tracking, and remaining weight management
-- Automatic filament consumption tracking: AMS RFID for Bambu Lab spools, 3MF estimates for third-party spools
+- Automatic filament consumption tracking: 3MF slicer estimates for all spools (primary), AMS remain% delta as fallback
+- Per-layer gcode accuracy for partial prints (failed/cancelled), with linear scaling fallback
 - Spool catalog, color catalog, PA profile matching, and low-stock alerts
 
 ### 🔧 Integrations

+ 15 - 0
backend/app/main.py

@@ -2348,6 +2348,21 @@ async def on_print_complete(printer_id: int, data: dict):
                             "actual_filament_grams": archive.filament_used_grams,
                             "failure_reason": archive.failure_reason,
                         }
+
+                        # Scale filament usage for partial prints
+                        if print_status != "completed" and archive.filament_used_grams:
+                            progress = data.get("progress") or 0
+                            scale = max(0.0, min(progress / 100.0, 1.0))
+                            archive_data["actual_filament_grams"] = round(archive.filament_used_grams * scale, 1)
+                            archive_data["progress"] = progress
+
+                        # Pass per-slot data from archive.extra_data
+                        if archive.extra_data and archive.extra_data.get("filament_slots"):
+                            slots = archive.extra_data["filament_slots"]
+                            if print_status != "completed":
+                                scale = max(0.0, min((data.get("progress") or 0) / 100.0, 1.0))
+                                slots = [{**s, "used_g": round(s["used_g"] * scale, 1)} for s in slots]
+                            archive_data["filament_slots"] = slots
                         # Add finish photo URL and image bytes if available
                         if finish_photo_filename:
                             from backend.app.api.routes.settings import get_setting

+ 31 - 2
backend/app/schemas/notification_template.py

@@ -31,12 +31,34 @@ EVENT_VARIABLES: dict[str, list[str]] = {
         "filename",
         "duration",
         "filament_grams",
+        "filament_details",
+        "finish_photo_url",
+        "timestamp",
+        "app_name",
+    ],
+    "print_failed": [
+        "printer",
+        "filename",
+        "duration",
+        "filament_grams",
+        "filament_details",
+        "progress",
+        "reason",
+        "finish_photo_url",
+        "timestamp",
+        "app_name",
+    ],
+    "print_stopped": [
+        "printer",
+        "filename",
+        "duration",
+        "filament_grams",
+        "filament_details",
+        "progress",
         "finish_photo_url",
         "timestamp",
         "app_name",
     ],
-    "print_failed": ["printer", "filename", "duration", "reason", "finish_photo_url", "timestamp", "app_name"],
-    "print_stopped": ["printer", "filename", "duration", "finish_photo_url", "timestamp", "app_name"],
     "print_progress": ["printer", "filename", "progress", "remaining_time", "timestamp", "app_name"],
     "printer_offline": ["printer", "timestamp", "app_name"],
     "printer_error": ["printer", "error_type", "error_detail", "timestamp", "app_name"],
@@ -72,6 +94,7 @@ SAMPLE_DATA: dict[str, dict[str, str]] = {
         "filename": "Benchy.3mf",
         "duration": "1h 18m",
         "filament_grams": "15.2",
+        "filament_details": "PLA: 15.2g",
         "finish_photo_url": "/api/v1/archives/123/photos/finish_20240115_154800_abc12345.jpg",
         "timestamp": "2024-01-15 15:48",
         "app_name": "Bambuddy",
@@ -80,6 +103,9 @@ SAMPLE_DATA: dict[str, dict[str, str]] = {
         "printer": "Bambu X1C",
         "filename": "Benchy.3mf",
         "duration": "0h 45m",
+        "filament_grams": "7.6",
+        "filament_details": "PLA: 7.6g",
+        "progress": "50",
         "reason": "Filament runout",
         "finish_photo_url": "/api/v1/archives/123/photos/finish_20240115_151500_def67890.jpg",
         "timestamp": "2024-01-15 15:15",
@@ -89,6 +115,9 @@ SAMPLE_DATA: dict[str, dict[str, str]] = {
         "printer": "Bambu X1C",
         "filename": "Benchy.3mf",
         "duration": "0h 30m",
+        "filament_grams": "4.6",
+        "filament_details": "PLA: 4.6g",
+        "progress": "30",
         "finish_photo_url": "/api/v1/archives/123/photos/finish_20240115_150000_ghi11223.jpg",
         "timestamp": "2024-01-15 15:00",
         "app_name": "Bambuddy",

+ 21 - 0
backend/app/services/archive.py

@@ -151,6 +151,27 @@ class ThreeMFParser:
                         self.metadata["_slice_filament_type"] = ", ".join(types)
                     if colors:
                         self.metadata["_slice_filament_color"] = ",".join(colors)
+
+                    # Collect per-slot filament usage for tracking & notifications
+                    filament_slots = []
+                    for f in filaments:
+                        slot_id = f.get("id")
+                        used_g_str = f.get("used_g", "0")
+                        try:
+                            used_g = float(used_g_str)
+                        except (ValueError, TypeError):
+                            used_g = 0
+                        if used_g > 0 and slot_id:
+                            filament_slots.append(
+                                {
+                                    "slot_id": int(slot_id),
+                                    "used_g": round(used_g, 2),
+                                    "type": f.get("type", ""),
+                                    "color": f.get("color", ""),
+                                }
+                            )
+                    if filament_slots:
+                        self.metadata["filament_slots"] = filament_slots
         except Exception:
             pass  # Skip unparseable slice_info metadata
 

+ 13 - 0
backend/app/services/notification_service.py

@@ -728,6 +728,19 @@ class NotificationService:
             if archive_data.get("finish_photo_url"):
                 variables["finish_photo_url"] = archive_data["finish_photo_url"]
 
+            # Build per-slot breakdown string
+            if archive_data.get("filament_slots"):
+                parts = []
+                for slot in archive_data["filament_slots"]:
+                    ftype = slot.get("type", "Unknown") or "Unknown"
+                    used = slot.get("used_g", 0)
+                    parts.append(f"{ftype}: {used:.1f}g")
+                variables["filament_details"] = " | ".join(parts)
+
+            # Add progress for partial prints
+            if archive_data.get("progress") is not None:
+                variables["progress"] = str(archive_data["progress"])
+
         # Extract image data for providers that support attachments (e.g. Pushover)
         image_data = None
         if archive_data:

+ 111 - 30
backend/app/services/usage_tracker.py

@@ -3,10 +3,11 @@
 Captures AMS tray remain% at print start, then computes consumption
 deltas at print complete to update spool weight_used and last_used.
 
-For non-BL spools (no RFID, AMS reports remain=-1), falls back to
-per-filament usage estimates from the archived 3MF file.
+Primary tracking uses 3MF slicer estimates (precise per-filament data).
+AMS remain% delta is the fallback for trays not covered by 3MF data.
 """
 
+import json
 import logging
 from dataclasses import dataclass, field
 from datetime import datetime, timezone
@@ -87,9 +88,9 @@ async def on_print_complete(
 ) -> list[dict]:
     """Compute consumption deltas and update spool weight_used/last_used.
 
-    Uses two tracking strategies:
-    1. AMS remain% delta — for BL spools with valid RFID remain data
-    2. 3MF per-filament estimates — for non-BL spools without remain data
+    Uses two tracking strategies in priority order:
+    1. 3MF per-filament estimates (primary) — precise slicer data for all spools
+    2. AMS remain% delta (fallback) — only for trays not already handled by 3MF
 
     Returns a list of dicts describing what was logged (for WebSocket broadcast).
     """
@@ -98,7 +99,17 @@ async def on_print_complete(
     results = []
     handled_trays: set[tuple[int, int]] = set()
 
-    # --- Path 1: AMS remain% delta (for spools with valid RFID remain data) ---
+    # --- Path 1 (PRIMARY): 3MF per-filament estimates ---
+    if archive_id:
+        print_name = (
+            (session.print_name if session else None) or data.get("subtask_name", "") or data.get("filename", "unknown")
+        )
+        threemf_results = await _track_from_3mf(
+            printer_id, archive_id, status, print_name, handled_trays, printer_manager, db
+        )
+        results.extend(threemf_results)
+
+    # --- Path 2 (FALLBACK): AMS remain% delta (only for trays not handled by 3MF) ---
     if session and session.tray_remain_start:
         state = printer_manager.get_status(printer_id)
         if state and state.raw_data:
@@ -113,6 +124,9 @@ async def on_print_complete(
                     tray_id = int(tray.get("id", 0))
                     key = (ams_id, tray_id)
 
+                    if key in handled_trays:
+                        continue  # Already tracked via 3MF
+
                     if key not in session.tray_remain_start:
                         continue
 
@@ -174,7 +188,7 @@ async def on_print_complete(
                     )
 
                     logger.info(
-                        "[UsageTracker] Spool %d consumed %.1fg (%d%%) on printer %d AMS%d-T%d (%s)",
+                        "[UsageTracker] Spool %d consumed %.1fg (%d%%) on printer %d AMS%d-T%d (AMS fallback, %s)",
                         spool.id,
                         weight_grams,
                         delta_pct,
@@ -184,16 +198,6 @@ async def on_print_complete(
                         status,
                     )
 
-    # --- Path 2: 3MF per-filament estimates (for non-BL spools without remain data) ---
-    if archive_id:
-        print_name = (
-            (session.print_name if session else None) or data.get("subtask_name", "") or data.get("filename", "unknown")
-        )
-        threemf_results = await _track_from_3mf(
-            printer_id, archive_id, status, print_name, handled_trays, printer_manager, db
-        )
-        results.extend(threemf_results)
-
     if results:
         await db.commit()
 
@@ -209,14 +213,20 @@ async def _track_from_3mf(
     printer_manager,
     db: AsyncSession,
 ) -> list[dict]:
-    """Track usage from 3MF per-filament data for non-BL spools.
+    """Track usage from 3MF per-filament slicer data (primary path).
+
+    Uses slicer-estimated filament weight for all spools (BL and non-BL).
+    For partial prints (failed/aborted), tries per-layer gcode data first,
+    then falls back to linear scaling by progress.
 
-    Falls back to slicer-estimated filament weight when AMS remain% is
-    unavailable (non-RFID spools). For partial prints (failed/aborted),
-    scales the estimate by print progress.
+    Slot-to-tray mapping priority:
+    1. Queue item ams_mapping (for queue-initiated prints)
+    2. tray_now from printer state (for single-filament non-queue prints)
+    3. Default mapping: slot_id - 1 = global_tray_id (last resort)
     """
     from backend.app.core.config import settings as app_settings
     from backend.app.models.archive import PrintArchive
+    from backend.app.models.print_queue import PrintQueueItem
     from backend.app.utils.threemf_tools import extract_filament_usage_from_3mf
 
     result = await db.execute(select(PrintArchive).where(PrintArchive.id == archive_id))
@@ -232,6 +242,29 @@ async def _track_from_3mf(
     if not filament_usage:
         return []
 
+    # --- Resolve slot-to-tray mapping ---
+    # 1. Try queue item ams_mapping (queue-initiated prints store the exact mapping)
+    slot_to_tray = None
+    queue_result = await db.execute(
+        select(PrintQueueItem)
+        .where(PrintQueueItem.archive_id == archive_id)
+        .where(PrintQueueItem.status.in_(["printing", "completed", "failed"]))
+    )
+    queue_item = queue_result.scalar_one_or_none()
+    if queue_item and queue_item.ams_mapping:
+        try:
+            slot_to_tray = json.loads(queue_item.ams_mapping)
+        except (json.JSONDecodeError, TypeError):
+            pass
+
+    # 2. For single-filament non-queue prints, use tray_now from printer state
+    nonzero_slots = [u for u in filament_usage if u.get("used_g", 0) > 0]
+    tray_now_override: int | None = None
+    if not slot_to_tray and len(nonzero_slots) == 1:
+        state = printer_manager.get_status(printer_id)
+        if state and state.tray_now < 255:
+            tray_now_override = state.tray_now
+
     # Scale factor for partial prints (failed/aborted)
     if status == "completed":
         scale = 1.0
@@ -240,6 +273,34 @@ async def _track_from_3mf(
         progress = state.progress if state else 0
         scale = max(0.0, min(progress / 100.0, 1.0))
 
+    # Per-layer gcode accuracy for partial prints
+    layer_grams: dict[int, float] | None = None
+    if status != "completed":
+        state = printer_manager.get_status(printer_id)
+        current_layer = state.layer_num if state else 0
+        if current_layer > 0:
+            try:
+                from backend.app.utils.threemf_tools import (
+                    extract_filament_properties_from_3mf,
+                    extract_layer_filament_usage_from_3mf,
+                    get_cumulative_usage_at_layer,
+                    mm_to_grams,
+                )
+
+                layer_usage = extract_layer_filament_usage_from_3mf(file_path)
+                if layer_usage:
+                    cumulative_mm = get_cumulative_usage_at_layer(layer_usage, current_layer)
+                    filament_props = extract_filament_properties_from_3mf(file_path)
+                    layer_grams = {}
+                    for filament_id, mm_used in cumulative_mm.items():
+                        slot_id = filament_id + 1  # 0-based to 1-based
+                        props = filament_props.get(slot_id, {})
+                        density = props.get("density", 1.24)
+                        diameter = props.get("diameter", 1.75)
+                        layer_grams[slot_id] = mm_to_grams(mm_used, diameter, density)
+            except Exception:
+                pass  # Fall back to linear scaling
+
     results = []
 
     for usage in filament_usage:
@@ -248,8 +309,18 @@ async def _track_from_3mf(
         if used_g <= 0:
             continue
 
-        # Map 3MF slot_id (1-based) to (ams_id, tray_id)
-        global_tray_id = slot_id - 1
+        # Map 3MF slot_id to physical (ams_id, tray_id) using resolved mapping
+        if tray_now_override is not None:
+            # Single-filament non-queue print: use actual tray from printer state
+            global_tray_id = tray_now_override
+        else:
+            # Queue mapping or default: slot_id - 1, overridden by ams_mapping
+            global_tray_id = slot_id - 1
+            if slot_to_tray and slot_id <= len(slot_to_tray):
+                mapped = slot_to_tray[slot_id - 1]
+                if isinstance(mapped, int) and mapped >= 0:
+                    global_tray_id = mapped
+
         if global_tray_id >= 128:
             ams_id = global_tray_id
             tray_id = 0
@@ -259,7 +330,7 @@ async def _track_from_3mf(
 
         key = (ams_id, tray_id)
         if key in handled_trays:
-            continue  # Already tracked via AMS remain% delta
+            continue
 
         # Find spool assignment for this tray
         assign_result = await db.execute(
@@ -279,11 +350,12 @@ async def _track_from_3mf(
         if not spool:
             continue
 
-        # Only use 3MF tracking for non-BL spools (BL spools use AMS remain%)
-        if spool.tag_uid or spool.tray_uuid:
-            continue
+        # Use per-layer grams if available, otherwise linear scale
+        if layer_grams and slot_id in layer_grams:
+            weight_grams = layer_grams[slot_id]
+        else:
+            weight_grams = used_g * scale
 
-        weight_grams = used_g * scale
         if weight_grams <= 0:
             continue
 
@@ -304,6 +376,7 @@ async def _track_from_3mf(
         )
         db.add(history)
 
+        handled_trays.add(key)
         results.append(
             {
                 "spool_id": spool.id,
@@ -314,11 +387,19 @@ async def _track_from_3mf(
             }
         )
 
+        # Determine mapping source for debug logging
+        if tray_now_override is not None:
+            map_src = ", tray_now"
+        elif slot_to_tray:
+            map_src = ", queue_map"
+        else:
+            map_src = ""
         logger.info(
-            "[UsageTracker] Spool %d consumed %.1fg (3MF estimate%s) on printer %d AMS%d-T%d (%s)",
+            "[UsageTracker] Spool %d consumed %.1fg (3MF%s%s) on printer %d AMS%d-T%d (%s)",
             spool.id,
             weight_grams,
-            f" scaled to {scale:.0%}" if scale < 1 else "",
+            " per-layer" if (layer_grams and slot_id in layer_grams) else (f" scaled {scale:.0%}" if scale < 1 else ""),
+            map_src,
             printer_id,
             ams_id,
             tray_id,

+ 27 - 14
backend/tests/unit/services/test_usage_tracker.py

@@ -1,7 +1,7 @@
 """Unit tests for the filament usage tracker.
 
-Tests both AMS remain% delta tracking (Path 1) and 3MF per-filament
-fallback tracking (Path 2) for non-BL spools.
+Tests 3MF-primary tracking (Path 1) and AMS remain% delta fallback
+(Path 2) for spools not covered by 3MF data.
 """
 
 from datetime import datetime, timezone
@@ -40,11 +40,13 @@ def _make_assignment(*, spool_id=1, printer_id=1, ams_id=0, tray_id=0):
     return assignment
 
 
-def _make_printer_state(ams_data, progress=0):
+def _make_printer_state(ams_data, progress=0, layer_num=0, tray_now=255):
     """Create a mock printer state with AMS data."""
     state = MagicMock()
     state.raw_data = {"ams": ams_data}
     state.progress = progress
+    state.layer_num = layer_num
+    state.tray_now = tray_now
     return state
 
 
@@ -189,16 +191,17 @@ class TestTrackFrom3MF:
         archive.file_path = "archives/test.3mf"
 
         db = AsyncMock()
-        # First execute → archive, second → assignment, third → spool
+        # archive, queue_item(None), assignment, spool
         db.execute = AsyncMock(
             side_effect=[
                 MagicMock(scalar_one_or_none=MagicMock(return_value=archive)),
+                MagicMock(scalar_one_or_none=MagicMock(return_value=None)),
                 MagicMock(scalar_one_or_none=MagicMock(return_value=assignment)),
                 MagicMock(scalar_one_or_none=MagicMock(return_value=spool)),
             ]
         )
 
-        pm = _make_printer_manager()
+        pm = _make_printer_manager(_make_printer_state([], tray_now=0))
         filament_usage = [{"slot_id": 1, "used_g": 25.5, "type": "PLA", "color": "#FF0000"}]
 
         with (
@@ -234,16 +237,18 @@ class TestTrackFrom3MF:
         archive.file_path = "archives/test.3mf"
 
         db = AsyncMock()
+        # archive, queue_item(None), assignment, spool
         db.execute = AsyncMock(
             side_effect=[
                 MagicMock(scalar_one_or_none=MagicMock(return_value=archive)),
+                MagicMock(scalar_one_or_none=MagicMock(return_value=None)),
                 MagicMock(scalar_one_or_none=MagicMock(return_value=assignment)),
                 MagicMock(scalar_one_or_none=MagicMock(return_value=spool)),
             ]
         )
 
         # Print failed at 50% progress → 50g consumed from 100g estimate
-        pm = _make_printer_manager(_make_printer_state([], progress=50))
+        pm = _make_printer_manager(_make_printer_state([], progress=50, tray_now=0))
         filament_usage = [{"slot_id": 1, "used_g": 100.0, "type": "PLA", "color": ""}]
 
         with (
@@ -269,23 +274,25 @@ class TestTrackFrom3MF:
         assert spool.weight_used == 50.0
 
     @pytest.mark.asyncio
-    async def test_skips_bl_spools(self):
-        """BL spools (with tag_uid) are NOT tracked via 3MF — they use AMS remain%."""
+    async def test_tracks_bl_spools_via_3mf(self):
+        """BL spools (with tag_uid) ARE now tracked via 3MF (unified tracking)."""
         spool = _make_spool(tag_uid="ABCD1234", tray_uuid="A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4")
         assignment = _make_assignment()
         archive = MagicMock()
         archive.file_path = "archives/test.3mf"
 
         db = AsyncMock()
+        # archive, queue_item(None), assignment, spool
         db.execute = AsyncMock(
             side_effect=[
                 MagicMock(scalar_one_or_none=MagicMock(return_value=archive)),
+                MagicMock(scalar_one_or_none=MagicMock(return_value=None)),
                 MagicMock(scalar_one_or_none=MagicMock(return_value=assignment)),
                 MagicMock(scalar_one_or_none=MagicMock(return_value=spool)),
             ]
         )
 
-        pm = _make_printer_manager()
+        pm = _make_printer_manager(_make_printer_state([], tray_now=0))
         filament_usage = [{"slot_id": 1, "used_g": 50.0, "type": "PLA", "color": ""}]
 
         with (
@@ -306,7 +313,9 @@ class TestTrackFrom3MF:
                 db=db,
             )
 
-        assert results == []
+        assert len(results) == 1
+        assert results[0]["spool_id"] == 1
+        assert results[0]["weight_used"] == 50.0
 
     @pytest.mark.asyncio
     async def test_skips_already_handled_trays(self):
@@ -315,13 +324,15 @@ class TestTrackFrom3MF:
         archive.file_path = "archives/test.3mf"
 
         db = AsyncMock()
+        # archive, queue_item(None)
         db.execute = AsyncMock(
             side_effect=[
                 MagicMock(scalar_one_or_none=MagicMock(return_value=archive)),
+                MagicMock(scalar_one_or_none=MagicMock(return_value=None)),
             ]
         )
 
-        pm = _make_printer_manager()
+        pm = _make_printer_manager(_make_printer_state([], tray_now=0))
         filament_usage = [{"slot_id": 1, "used_g": 50.0, "type": "PLA", "color": ""}]
 
         with (
@@ -346,23 +357,25 @@ class TestTrackFrom3MF:
 
     @pytest.mark.asyncio
     async def test_slot_to_tray_mapping(self):
-        """3MF slot_id maps correctly to (ams_id, tray_id)."""
-        # slot 5 → global_tray_id 4 → ams_id=1, tray_id=0
+        """3MF slot_id maps correctly to (ams_id, tray_id) via tray_now."""
+        # tray_now=4 → ams_id=1, tray_id=0 (single filament uses tray_now)
         spool = _make_spool(id=9)
         assignment = _make_assignment(spool_id=9, ams_id=1, tray_id=0)
         archive = MagicMock()
         archive.file_path = "archives/test.3mf"
 
         db = AsyncMock()
+        # archive, queue_item(None), assignment, spool
         db.execute = AsyncMock(
             side_effect=[
                 MagicMock(scalar_one_or_none=MagicMock(return_value=archive)),
+                MagicMock(scalar_one_or_none=MagicMock(return_value=None)),
                 MagicMock(scalar_one_or_none=MagicMock(return_value=assignment)),
                 MagicMock(scalar_one_or_none=MagicMock(return_value=spool)),
             ]
         )
 
-        pm = _make_printer_manager()
+        pm = _make_printer_manager(_make_printer_state([], tray_now=4))
         filament_usage = [{"slot_id": 5, "used_g": 30.0, "type": "PETG", "color": ""}]
 
         with (

+ 726 - 0
backend/tests/unit/test_usage_tracker.py

@@ -0,0 +1,726 @@
+"""Unit tests for usage_tracker.py — 3MF-primary filament tracking.
+
+Tests the unified tracking logic: 3MF slicer estimates as primary path,
+AMS remain% delta as fallback, per-layer gcode for partial prints,
+slot-to-tray mapping resolution, and notification variable formatting.
+"""
+
+from datetime import datetime, timezone
+from types import SimpleNamespace
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from backend.app.services.usage_tracker import (
+    PrintSession,
+    _active_sessions,
+    _track_from_3mf,
+    on_print_complete,
+    on_print_start,
+)
+
+
+def _make_spool(spool_id=1, label_weight=1000, weight_used=0, tag_uid=None, tray_uuid=None):
+    """Create a mock Spool object."""
+    spool = MagicMock()
+    spool.id = spool_id
+    spool.label_weight = label_weight
+    spool.weight_used = weight_used
+    spool.tag_uid = tag_uid
+    spool.tray_uuid = tray_uuid
+    spool.last_used = None
+    return spool
+
+
+def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
+    """Create a mock SpoolAssignment object."""
+    assignment = MagicMock()
+    assignment.spool_id = spool_id
+    assignment.printer_id = printer_id
+    assignment.ams_id = ams_id
+    assignment.tray_id = tray_id
+    return assignment
+
+
+def _make_archive(archive_id=1, file_path="archives/1/test.3mf", extra_data=None):
+    """Create a mock PrintArchive object."""
+    archive = MagicMock()
+    archive.id = archive_id
+    archive.file_path = file_path
+    archive.extra_data = extra_data
+    return archive
+
+
+def _make_queue_item(ams_mapping=None, status="printing"):
+    """Create a mock PrintQueueItem object."""
+    item = MagicMock()
+    item.ams_mapping = ams_mapping
+    item.status = status
+    return item
+
+
+def _mock_db_execute(*return_values):
+    """Create a mock db with execute() that returns values in sequence."""
+    db = AsyncMock()
+    results = []
+    for val in return_values:
+        result = MagicMock()
+        result.scalar_one_or_none.return_value = val
+        results.append(result)
+    db.execute = AsyncMock(side_effect=results)
+    return db
+
+
+def _mock_db_sequential(responses):
+    """Create mock db that returns responses in order."""
+    db = AsyncMock()
+    call_count = [0]
+
+    async def mock_execute(*args, **kwargs):
+        idx = call_count[0]
+        call_count[0] += 1
+        result = MagicMock()
+        if idx < len(responses):
+            result.scalar_one_or_none.return_value = responses[idx]
+        else:
+            result.scalar_one_or_none.return_value = None
+        return result
+
+    db.execute = mock_execute
+    return db
+
+
+class TestOnPrintStart:
+    """Tests for on_print_start()."""
+
+    @pytest.fixture(autouse=True)
+    def _clear_sessions(self):
+        _active_sessions.clear()
+        yield
+        _active_sessions.clear()
+
+    @pytest.mark.asyncio
+    async def test_captures_remain_data(self):
+        """Captures AMS remain% at print start."""
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}, {"id": 1, "remain": 50}]}]}
+        )
+
+        await on_print_start(1, {"subtask_name": "Benchy"}, printer_manager)
+
+        assert 1 in _active_sessions
+        session = _active_sessions[1]
+        assert session.print_name == "Benchy"
+        assert session.tray_remain_start == {(0, 0): 80, (0, 1): 50}
+
+    @pytest.mark.asyncio
+    async def test_creates_session_without_remain(self):
+        """Creates session even without valid remain data (for 3MF tracking)."""
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": -1}]}]}
+        )
+
+        await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
+
+        assert 1 in _active_sessions
+        assert _active_sessions[1].tray_remain_start == {}
+
+
+class TestOnPrintComplete:
+    """Tests for on_print_complete() — path ordering and interaction."""
+
+    @pytest.fixture(autouse=True)
+    def _clear_sessions(self):
+        _active_sessions.clear()
+        yield
+        _active_sessions.clear()
+
+    @pytest.mark.asyncio
+    async def test_bl_spool_uses_3mf(self):
+        """BL spool (with tag_uid) is tracked via 3MF, not just AMS delta."""
+        spool = _make_spool(spool_id=1, tag_uid="AABB1122", label_weight=1000)
+        assignment = _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0)
+        archive = _make_archive(archive_id=10)
+
+        # Setup: session with AMS remain data
+        _active_sessions[1] = PrintSession(
+            printer_id=1,
+            print_name="Benchy",
+            started_at=datetime.now(timezone.utc),
+            tray_remain_start={(0, 0): 80},
+        )
+
+        # Mock printer state: tray_now=0 (AMS0-T0), single filament
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
+            progress=100,
+            layer_num=50,
+            tray_now=0,
+        )
+
+        # db returns: archive, queue_item(None), assignment, spool
+        db = _mock_db_sequential([archive, None, assignment, spool])
+
+        filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"}]
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await on_print_complete(
+                printer_id=1,
+                data={"status": "completed"},
+                printer_manager=printer_manager,
+                db=db,
+                archive_id=10,
+            )
+
+        # 3MF path should handle it (BL guard removed)
+        assert len(results) >= 1
+        assert results[0]["spool_id"] == 1
+        assert results[0]["weight_used"] == 15.0
+
+    @pytest.mark.asyncio
+    async def test_ams_delta_fallback_no_archive(self):
+        """AMS delta tracks consumption when archive_id is None."""
+        spool = _make_spool(spool_id=2, label_weight=1000)
+        assignment = _make_assignment(spool_id=2)
+
+        _active_sessions[1] = PrintSession(
+            printer_id=1,
+            print_name="Test",
+            started_at=datetime.now(timezone.utc),
+            tray_remain_start={(0, 0): 80},
+        )
+
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
+        )
+
+        # db returns assignment then spool
+        db = _mock_db_sequential([assignment, spool])
+
+        results = await on_print_complete(
+            printer_id=1,
+            data={"status": "completed"},
+            printer_manager=printer_manager,
+            db=db,
+            archive_id=None,
+        )
+
+        assert len(results) == 1
+        assert results[0]["spool_id"] == 2
+        # 10% of 1000g = 100g
+        assert results[0]["weight_used"] == 100.0
+        assert results[0]["percent_used"] == 10
+
+    @pytest.mark.asyncio
+    async def test_no_double_tracking(self):
+        """When 3MF handles a tray, AMS delta skips it."""
+        spool = _make_spool(spool_id=1, label_weight=1000)
+        assignment = _make_assignment(spool_id=1)
+        archive = _make_archive(archive_id=10)
+
+        _active_sessions[1] = PrintSession(
+            printer_id=1,
+            print_name="Benchy",
+            started_at=datetime.now(timezone.utc),
+            tray_remain_start={(0, 0): 80},
+        )
+
+        # tray_now=0 matches the single filament slot
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
+            progress=100,
+            layer_num=50,
+            tray_now=0,
+        )
+
+        # db returns: archive, queue_item(None), assignment, spool
+        db = _mock_db_sequential([archive, None, assignment, spool])
+
+        filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"}]
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await on_print_complete(
+                printer_id=1,
+                data={"status": "completed"},
+                printer_manager=printer_manager,
+                db=db,
+                archive_id=10,
+            )
+
+        # Only 1 result (3MF), NOT 2 (3MF + AMS delta)
+        assert len(results) == 1
+        assert results[0]["weight_used"] == 15.0
+
+
+class TestTrackFrom3mf:
+    """Tests for _track_from_3mf() — per-layer, linear scaling, and slot mapping."""
+
+    @pytest.mark.asyncio
+    async def test_linear_fallback_for_partial_print(self):
+        """Falls back to linear scaling when gcode layer data unavailable."""
+        spool = _make_spool(spool_id=1, label_weight=1000)
+        assignment = _make_assignment(spool_id=1)
+        archive = _make_archive(archive_id=10)
+
+        # db: archive, queue_item(None), assignment, spool
+        db = _mock_db_sequential([archive, None, assignment, spool])
+
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            progress=50,
+            layer_num=25,
+            tray_now=0,
+        )
+
+        filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
+        handled_trays: set[tuple[int, int]] = set()
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+            patch(
+                "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
+                return_value=None,  # No layer data available
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await _track_from_3mf(
+                printer_id=1,
+                archive_id=10,
+                status="failed",
+                print_name="Benchy",
+                handled_trays=handled_trays,
+                printer_manager=printer_manager,
+                db=db,
+            )
+
+        assert len(results) == 1
+        # 50% of 20g = 10g
+        assert results[0]["weight_used"] == 10.0
+        # Tray should be marked as handled
+        assert (0, 0) in handled_trays
+
+    @pytest.mark.asyncio
+    async def test_per_layer_partial_print(self):
+        """Failed print at layer N uses gcode cumulative data."""
+        spool = _make_spool(spool_id=1, label_weight=1000)
+        assignment = _make_assignment(spool_id=1)
+        archive = _make_archive(archive_id=10)
+
+        # db: archive, queue_item(None), assignment, spool
+        db = _mock_db_sequential([archive, None, assignment, spool])
+
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            progress=50,
+            layer_num=25,
+            tray_now=0,
+        )
+
+        filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
+        # Per-layer data: at layer 25, filament 0 used 5000mm
+        layer_data = {10: {0: 2000.0}, 25: {0: 5000.0}, 50: {0: 10000.0}}
+        filament_props = {1: {"density": 1.24, "diameter": 1.75}}
+        handled_trays: set[tuple[int, int]] = set()
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+            patch(
+                "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
+                return_value=layer_data,
+            ),
+            patch(
+                "backend.app.utils.threemf_tools.get_cumulative_usage_at_layer",
+                return_value={0: 5000.0},
+            ),
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_properties_from_3mf",
+                return_value=filament_props,
+            ),
+            patch(
+                "backend.app.utils.threemf_tools.mm_to_grams",
+                return_value=12.0,  # 5000mm at 1.75mm/1.24g/cm3
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await _track_from_3mf(
+                printer_id=1,
+                archive_id=10,
+                status="failed",
+                print_name="Benchy",
+                handled_trays=handled_trays,
+                printer_manager=printer_manager,
+                db=db,
+            )
+
+        assert len(results) == 1
+        # Should use per-layer grams (12.0g), not linear scale (10.0g)
+        assert results[0]["weight_used"] == 12.0
+
+    @pytest.mark.asyncio
+    async def test_completed_print_uses_full_weight(self):
+        """Completed print uses full 3MF weight (scale=1.0)."""
+        spool = _make_spool(spool_id=1, label_weight=1000)
+        assignment = _make_assignment(spool_id=1)
+        archive = _make_archive(archive_id=10)
+
+        # db: archive, queue_item(None), assignment, spool
+        db = _mock_db_sequential([archive, None, assignment, spool])
+
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            progress=100,
+            layer_num=50,
+            tray_now=0,
+        )
+
+        filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
+        handled_trays: set[tuple[int, int]] = set()
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await _track_from_3mf(
+                printer_id=1,
+                archive_id=10,
+                status="completed",
+                print_name="Benchy",
+                handled_trays=handled_trays,
+                printer_manager=printer_manager,
+                db=db,
+            )
+
+        assert len(results) == 1
+        assert results[0]["weight_used"] == 20.0
+
+    @pytest.mark.asyncio
+    async def test_tray_now_override_for_single_filament(self):
+        """Single-filament non-queue print uses tray_now instead of slot_id mapping."""
+        # Spool 2 is at AMS1-T3 (global_tray_id=7)
+        spool = _make_spool(spool_id=2, label_weight=1000)
+        assignment = _make_assignment(spool_id=2, ams_id=1, tray_id=3)
+        archive = _make_archive(archive_id=10)
+
+        # db: archive, queue_item(None), assignment, spool
+        db = _mock_db_sequential([archive, None, assignment, spool])
+
+        # tray_now=7 = (ams_id=1, tray_id=3), the ACTUAL tray used
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            progress=100,
+            layer_num=50,
+            tray_now=7,
+        )
+
+        # 3MF has slot_id=12 (would default-map to ams_id=2, tray_id=3 — WRONG)
+        filament_usage = [{"slot_id": 12, "used_g": 10.6, "type": "PLA", "color": "#FF0000"}]
+        handled_trays: set[tuple[int, int]] = set()
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await _track_from_3mf(
+                printer_id=1,
+                archive_id=10,
+                status="completed",
+                print_name="Test",
+                handled_trays=handled_trays,
+                printer_manager=printer_manager,
+                db=db,
+            )
+
+        assert len(results) == 1
+        assert results[0]["spool_id"] == 2
+        assert results[0]["ams_id"] == 1
+        assert results[0]["tray_id"] == 3
+        assert results[0]["weight_used"] == 10.6
+        assert (1, 3) in handled_trays
+
+    @pytest.mark.asyncio
+    async def test_queue_ams_mapping_overrides_default(self):
+        """Queue item ams_mapping overrides default slot_id mapping."""
+        # Spool at AMS1-T3 (global_tray_id=7)
+        spool = _make_spool(spool_id=5, label_weight=1000)
+        assignment = _make_assignment(spool_id=5, ams_id=1, tray_id=3)
+        archive = _make_archive(archive_id=20)
+        # Queue item maps slot 1 → global tray 7 (ams_id=1, tray_id=3)
+        queue_item = _make_queue_item(ams_mapping="[7, -1, -1, -1]")
+
+        # db: archive, queue_item, assignment, spool
+        db = _mock_db_sequential([archive, queue_item, assignment, spool])
+
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            progress=100,
+            layer_num=50,
+            tray_now=7,
+        )
+
+        filament_usage = [{"slot_id": 1, "used_g": 25.0, "type": "PETG", "color": ""}]
+        handled_trays: set[tuple[int, int]] = set()
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await _track_from_3mf(
+                printer_id=1,
+                archive_id=20,
+                status="completed",
+                print_name="Queue Print",
+                handled_trays=handled_trays,
+                printer_manager=printer_manager,
+                db=db,
+            )
+
+        assert len(results) == 1
+        assert results[0]["spool_id"] == 5
+        assert results[0]["ams_id"] == 1
+        assert results[0]["tray_id"] == 3
+        assert results[0]["weight_used"] == 25.0
+
+    @pytest.mark.asyncio
+    async def test_multi_filament_uses_queue_mapping(self):
+        """Multi-filament queue prints use ams_mapping for each slot."""
+        spool_a = _make_spool(spool_id=1, label_weight=1000)
+        spool_b = _make_spool(spool_id=2, label_weight=1000)
+        assign_a = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
+        assign_b = _make_assignment(spool_id=2, ams_id=1, tray_id=2)
+        archive = _make_archive(archive_id=30)
+        # slot 1 → tray 0 (AMS0-T0), slot 2 → tray 6 (AMS1-T2)
+        queue_item = _make_queue_item(ams_mapping="[0, 6]")
+
+        # db: archive, queue_item, assign_a, spool_a, assign_b, spool_b
+        db = _mock_db_sequential([archive, queue_item, assign_a, spool_a, assign_b, spool_b])
+
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            progress=100,
+            layer_num=50,
+            tray_now=6,
+        )
+
+        filament_usage = [
+            {"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""},
+            {"slot_id": 2, "used_g": 5.0, "type": "PETG", "color": ""},
+        ]
+        handled_trays: set[tuple[int, int]] = set()
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await _track_from_3mf(
+                printer_id=1,
+                archive_id=30,
+                status="completed",
+                print_name="Multi",
+                handled_trays=handled_trays,
+                printer_manager=printer_manager,
+                db=db,
+            )
+
+        assert len(results) == 2
+        assert results[0]["spool_id"] == 1
+        assert results[0]["ams_id"] == 0
+        assert results[0]["tray_id"] == 0
+        assert results[0]["weight_used"] == 10.0
+        assert results[1]["spool_id"] == 2
+        assert results[1]["ams_id"] == 1
+        assert results[1]["tray_id"] == 2
+        assert results[1]["weight_used"] == 5.0
+
+    @pytest.mark.asyncio
+    async def test_no_tray_now_override_for_multi_filament(self):
+        """Multi-filament non-queue prints fall back to default mapping, not tray_now."""
+        spool = _make_spool(spool_id=1, label_weight=1000)
+        assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
+        archive = _make_archive(archive_id=10)
+
+        # db: archive, queue_item(None), assignment, spool (2nd slot has no assignment)
+        db = _mock_db_sequential([archive, None, assignment, spool, None])
+
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            progress=100,
+            layer_num=50,
+            tray_now=4,  # tray_now won't be used
+        )
+
+        # Two filament slots with usage
+        filament_usage = [
+            {"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""},
+            {"slot_id": 2, "used_g": 5.0, "type": "PETG", "color": ""},
+        ]
+        handled_trays: set[tuple[int, int]] = set()
+
+        with (
+            patch("backend.app.core.config.settings") as mock_settings,
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=filament_usage,
+            ),
+        ):
+            mock_settings.base_dir = MagicMock()
+            mock_path = MagicMock()
+            mock_path.exists.return_value = True
+            mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
+
+            results = await _track_from_3mf(
+                printer_id=1,
+                archive_id=10,
+                status="completed",
+                print_name="Test",
+                handled_trays=handled_trays,
+                printer_manager=printer_manager,
+                db=db,
+            )
+
+        # Should use default mapping (slot 1 → tray 0, slot 2 → tray 1)
+        assert len(results) == 1  # Only slot 1 has assignment
+        assert results[0]["ams_id"] == 0
+        assert results[0]["tray_id"] == 0
+
+
+class TestNotificationVariables:
+    """Tests for filament_details formatting in notifications."""
+
+    def test_filament_details_single_slot(self):
+        """Single slot produces 'PLA: 15.2g' format."""
+        slots = [{"type": "PLA", "used_g": 15.2, "slot_id": 1, "color": "#FF0000"}]
+        parts = []
+        for slot in slots:
+            ftype = slot.get("type", "Unknown") or "Unknown"
+            used = slot.get("used_g", 0)
+            parts.append(f"{ftype}: {used:.1f}g")
+        result = " | ".join(parts)
+        assert result == "PLA: 15.2g"
+
+    def test_filament_details_multi_slot(self):
+        """Multiple slots produce 'PLA: 10.0g | PETG: 5.0g' format."""
+        slots = [
+            {"type": "PLA", "used_g": 10.0, "slot_id": 1, "color": ""},
+            {"type": "PETG", "used_g": 5.0, "slot_id": 2, "color": ""},
+        ]
+        parts = []
+        for slot in slots:
+            ftype = slot.get("type", "Unknown") or "Unknown"
+            used = slot.get("used_g", 0)
+            parts.append(f"{ftype}: {used:.1f}g")
+        result = " | ".join(parts)
+        assert result == "PLA: 10.0g | PETG: 5.0g"
+
+    def test_filament_details_empty_type(self):
+        """Empty type defaults to 'Unknown'."""
+        slots = [{"type": "", "used_g": 5.0, "slot_id": 1, "color": ""}]
+        parts = []
+        for slot in slots:
+            ftype = slot.get("type", "Unknown") or "Unknown"
+            used = slot.get("used_g", 0)
+            parts.append(f"{ftype}: {used:.1f}g")
+        result = " | ".join(parts)
+        assert result == "Unknown: 5.0g"
+
+    def test_filament_grams_scaled_for_partial(self):
+        """filament_grams is scaled by progress for partial prints."""
+        filament_used_grams = 20.0
+        progress = 50
+        scale = max(0.0, min(progress / 100.0, 1.0))
+        scaled = round(filament_used_grams * scale, 1)
+        assert scaled == 10.0
+
+    def test_filament_grams_zero_progress(self):
+        """Progress=0 at cancellation gives 0.0g."""
+        filament_used_grams = 20.0
+        progress = 0
+        scale = max(0.0, min(progress / 100.0, 1.0))
+        scaled = round(filament_used_grams * scale, 1)
+        assert scaled == 0.0
+
+    def test_slot_scaling_for_partial(self):
+        """Per-slot usage is scaled linearly for partial prints."""
+        slots = [
+            {"type": "PLA", "used_g": 20.0, "slot_id": 1, "color": ""},
+            {"type": "PETG", "used_g": 10.0, "slot_id": 2, "color": ""},
+        ]
+        progress = 30
+        scale = max(0.0, min(progress / 100.0, 1.0))
+        scaled_slots = [{**s, "used_g": round(s["used_g"] * scale, 1)} for s in slots]
+        assert scaled_slots[0]["used_g"] == 6.0
+        assert scaled_slots[1]["used_g"] == 3.0