Browse Source

[Fix]: Properly track filament usage data in Spoolman (#686)

Properly track filament usage data in Spoolman#686
Dakota G 2 months ago
parent
commit
fb29eb4cd7

+ 37 - 4
backend/app/main.py

@@ -325,6 +325,14 @@ def register_expected_print(printer_id: int, filename: str, archive_id: int, ams
     )
 
 
+def _get_start_ams_mapping(data: dict, archive_id: int | None) -> list[int] | None:
+    """Resolve AMS mapping for print start without consuming stored queue/reprint state."""
+    stored_ams_mapping = data.get("ams_mapping")
+    if not stored_ams_mapping and archive_id:
+        stored_ams_mapping = _print_ams_mappings.get(archive_id)
+    return stored_ams_mapping
+
+
 _last_status_broadcast: dict[int, str] = {}
 # Track printers where we've updated nozzle_count
 _nozzle_count_updated: set[int] = set()
@@ -1318,7 +1326,14 @@ async def on_print_start(printer_id: int, data: dict):
 
                 # Store Spoolman tracking data for per-filament usage reporting
                 try:
-                    await _store_spoolman_print_data(printer_id, archive.id, archive.file_path, db, printer_manager)
+                    await _store_spoolman_print_data(
+                        printer_id,
+                        archive.id,
+                        archive.file_path,
+                        db,
+                        printer_manager,
+                        ams_mapping=_get_start_ams_mapping(data, archive.id),
+                    )
                 except Exception as e:
                     logger.warning("[SPOOLMAN] Failed to store tracking data: %s", e)
 
@@ -1635,7 +1650,12 @@ async def on_print_start(printer_id: int, data: dict):
                 # Store Spoolman tracking data (may not work for fallback since no 3MF)
                 try:
                     await _store_spoolman_print_data(
-                        printer_id, fallback_archive.id, fallback_archive.file_path, db, printer_manager
+                        printer_id,
+                        fallback_archive.id,
+                        fallback_archive.file_path,
+                        db,
+                        printer_manager,
+                        ams_mapping=_get_start_ams_mapping(data, fallback_archive.id),
                     )
                 except Exception as e:
                     logger.debug("[SPOOLMAN] Could not store tracking for fallback archive: %s", e)
@@ -1755,7 +1775,14 @@ async def on_print_start(printer_id: int, data: dict):
 
                 # Store Spoolman tracking data for per-filament usage reporting
                 try:
-                    await _store_spoolman_print_data(printer_id, archive.id, archive.file_path, db, printer_manager)
+                    await _store_spoolman_print_data(
+                        printer_id,
+                        archive.id,
+                        archive.file_path,
+                        db,
+                        printer_manager,
+                        ams_mapping=_get_start_ams_mapping(data, archive.id),
+                    )
                 except Exception as e:
                     logger.warning("[SPOOLMAN] Failed to store tracking data: %s", e)
 
@@ -2542,7 +2569,13 @@ async def on_print_complete(printer_id: int, data: dict):
         # Report partial usage if tracking data exists (only stored when weight sync is disabled)
         try:
             async with async_session() as db:
-                await _cleanup_spoolman_tracking(printer_id, archive_id, db)
+                await _cleanup_spoolman_tracking(
+                    printer_id,
+                    archive_id,
+                    db,
+                    last_layer_num=data.get("last_layer_num"),
+                    last_progress=data.get("last_progress"),
+                )
         except Exception as e:
             logger.debug("[SPOOLMAN] Cleanup failed: %s", e)
 

+ 187 - 46
backend/app/services/spoolman_tracking.py

@@ -18,33 +18,103 @@ logger = logging.getLogger(__name__)
 
 # Zero UUID used by Bambu printers for empty/unset tray_uuid
 _ZERO_UUID = "00000000000000000000000000000000"
+_ZERO_TAG_UID = "0000000000000000"
 
 
-def _resolve_spool_tag(tray_info: dict) -> str:
+def _is_non_zero_identifier(value: str) -> bool:
+    """Return True when identifier is non-empty and not all zeros."""
+    if not value:
+        return False
+    return set(value) != {"0"}
+
+
+def _to_fixed_hex(value: int, width: int) -> str:
+    """Mirror frontend toFixedHex(): uppercase, zero-padded, fixed width."""
+    safe = max(0, int(value))
+    return format(safe, "X").zfill(width)[-width:]
+
+
+def _hash_serial_to_hex32(serial: str) -> str:
+    """Mirror frontend hashSerialToHex32() exactly (32-bit FNV-1a)."""
+    input_str = (serial or "").strip().upper()
+    hash_value = 0x811C9DC5
+    for char in input_str:
+        hash_value ^= ord(char)
+        hash_value = (hash_value * 0x01000193) & 0xFFFFFFFF
+    return format(hash_value, "X").zfill(8)
+
+
+def _global_tray_id_to_ams_slot(global_tray_id: int) -> tuple[int, int]:
+    """Convert global tray id to (ams_id, tray_id) tuple for fallback tag generation."""
+    # External spool slots use IDs 254/255 and map to ams_id=255 tray_id=0/1.
+    if global_tray_id >= 254:
+        return 255, max(0, global_tray_id - 254)
+    # AMS-HT units are addressed by ams_id directly and have a single tray.
+    if global_tray_id >= 128:
+        return global_tray_id, 0
+    # Standard AMS units: four trays each.
+    return global_tray_id // 4, global_tray_id % 4
+
+
+def _get_fallback_spool_tag(printer_serial: str, global_tray_id: int) -> str:
+    """Mirror frontend getFallbackSpoolTag(serial, amsId, trayId) exactly."""
+    if not printer_serial:
+        return ""
+    ams_id, tray_id = _global_tray_id_to_ams_slot(global_tray_id)
+    return f"{_hash_serial_to_hex32(printer_serial)}{_to_fixed_hex(ams_id, 4)}{_to_fixed_hex(tray_id, 4)}"
+
+
+def _resolve_spool_tag(tray_info: dict, printer_serial: str = "", global_tray_id: int | None = None) -> str:
     """Get the best spool identifier from tray info (prefer tray_uuid over tag_uid).
 
     Returns empty string if no usable identifier is found.
     """
-    tray_uuid = tray_info.get("tray_uuid", "")
-    tag_uid = tray_info.get("tag_uid", "")
-    if tray_uuid and tray_uuid != _ZERO_UUID:
+    tray_uuid = str(tray_info.get("tray_uuid", "") or "")
+    tag_uid = str(tray_info.get("tag_uid", "") or "")
+    if tray_uuid and tray_uuid != _ZERO_UUID and _is_non_zero_identifier(tray_uuid):
         return tray_uuid
-    return tag_uid
+    if tag_uid and tag_uid != _ZERO_TAG_UID and _is_non_zero_identifier(tag_uid):
+        return tag_uid
+    if global_tray_id is not None:
+        return _get_fallback_spool_tag(printer_serial, global_tray_id)
+    return ""
+
+
+async def _get_printer_serial(printer_id: int) -> str:
+    """Get printer serial for deterministic fallback tag generation."""
+    from backend.app.models.printer import Printer
+    from backend.app.services.printer_manager import printer_manager
+
+    printer_info = printer_manager.get_printer(printer_id)
+    if printer_info and printer_info.serial_number:
+        return printer_info.serial_number
+
+    async with async_session() as db:
+        result = await db.execute(select(Printer.serial_number).where(Printer.id == printer_id))
+        serial_number = result.scalar_one_or_none()
+        return serial_number or ""
 
 
-def _resolve_global_tray_id(slot_id: int, slot_to_tray: list | None) -> int:
+def _resolve_global_tray_id(slot_id: int, slot_to_tray: list | None, ams_trays: dict | None = None) -> int:
     """Map a 1-based slot_id to a global_tray_id using optional custom mapping.
 
-    Default mapping: slot 1 -> tray 0, slot 2 -> tray 1, etc.
-    Custom mapping (from print queue): slot_to_tray[slot_id - 1] overrides default.
-    A value of -1 in custom mapping means unmapped (uses default).
+    Custom mapping: slot_to_tray[slot_id - 1] is used when >= 0.
+    Position-based default: uses sorted ams_trays keys so external spools (ID 254/255)
+    naturally follow standard AMS trays, matching the slicer's slot numbering.
+    A value of -1 in the custom mapping means unmapped (uses position-based default).
+    Final fallback: slot_id - 1 (legacy, works for pure AMS without external spools).
     """
-    global_tray_id = slot_id - 1
     if slot_to_tray and slot_id <= len(slot_to_tray):
         mapped_tray = slot_to_tray[slot_id - 1]
         if mapped_tray >= 0:
-            global_tray_id = mapped_tray
-    return global_tray_id
+            return mapped_tray
+    # Position-based default: sort available tray IDs so external spools (254/255)
+    # come after standard AMS trays, matching the slicer's slot assignment order.
+    if ams_trays:
+        sorted_tray_ids = sorted(ams_trays.keys())
+        if slot_id <= len(sorted_tray_ids):
+            return sorted_tray_ids[slot_id - 1]
+    return slot_id - 1
 
 
 def build_ams_tray_lookup(raw_data: dict) -> dict[int, dict]:
@@ -55,9 +125,9 @@ def build_ams_tray_lookup(raw_data: dict) -> dict[int, dict]:
     lookup = {}
     ams_data = raw_data.get("ams", [])
     for ams_unit in ams_data:
-        ams_id = ams_unit.get("id", 0)
+        ams_id = int(ams_unit.get("id", 0))
         for tray in ams_unit.get("tray", []):
-            tray_id = tray.get("id", 0)
+            tray_id = int(tray.get("id", 0))
             # AMS-HT units have IDs starting at 128 with a single tray
             global_tray_id = ams_id if ams_id >= 128 else ams_id * 4 + tray_id
             lookup[global_tray_id] = {
@@ -79,7 +149,14 @@ def build_ams_tray_lookup(raw_data: dict) -> dict[int, dict]:
     return lookup
 
 
-async def store_print_data(printer_id: int, archive_id: int, file_path: str, db, printer_manager):
+async def store_print_data(
+    printer_id: int,
+    archive_id: int,
+    file_path: str,
+    db,
+    printer_manager,
+    ams_mapping: list[int] | None = None,
+):
     """Store Spoolman tracking data at print start (persisted to database).
 
     Only stores data when Spoolman is enabled and AMS weight sync is disabled
@@ -124,17 +201,21 @@ async def store_print_data(printer_id: int, archive_id: int, file_path: str, db,
     if state and state.raw_data:
         ams_trays = build_ams_tray_lookup(state.raw_data)
 
-    # Get custom slot-to-tray mapping from queue item (if this is a queued print)
-    slot_to_tray = None
-    queue_result = await db.execute(
-        select(PrintQueueItem).where(PrintQueueItem.archive_id == archive_id).where(PrintQueueItem.status == "printing")
-    )
-    queue_item = queue_result.scalar_one_or_none()
-    if queue_item and queue_item.ams_mapping:
-        try:
-            slot_to_tray = json.loads(queue_item.ams_mapping)
-        except json.JSONDecodeError:
-            pass  # Ignore malformed AMS mapping; fall back to default slot assignment
+    # Prefer the explicit mapping captured from the print command, then fall back
+    # to any queue mapping stored for scheduled/reprint jobs.
+    slot_to_tray = ams_mapping if ams_mapping is not None else None
+    if not slot_to_tray:
+        queue_result = await db.execute(
+            select(PrintQueueItem)
+            .where(PrintQueueItem.archive_id == archive_id)
+            .where(PrintQueueItem.status == "printing")
+        )
+        queue_item = queue_result.scalar_one_or_none()
+        if queue_item and queue_item.ams_mapping:
+            try:
+                slot_to_tray = json.loads(queue_item.ams_mapping)
+            except json.JSONDecodeError:
+                pass  # Ignore malformed AMS mapping; fall back to default slot assignment
 
     # Parse G-code for per-layer filament usage (for accurate partial usage tracking)
     layer_usage = extract_layer_filament_usage_from_3mf(full_path)
@@ -176,7 +257,13 @@ async def store_print_data(printer_id: int, archive_id: int, file_path: str, db,
         logger.debug("[SPOOLMAN] Layer usage data available for partial tracking")
 
 
-async def cleanup_tracking(printer_id: int, archive_id: int, db):
+async def cleanup_tracking(
+    printer_id: int,
+    archive_id: int,
+    db,
+    last_layer_num: int | None = None,
+    last_progress: int | None = None,
+):
     """Report partial usage and clean up Spoolman tracking data for failed/aborted prints."""
     from backend.app.models.active_print_spoolman import ActivePrintSpoolman
 
@@ -194,7 +281,12 @@ async def cleanup_tracking(printer_id: int, archive_id: int, db):
 
     # Try to report partial usage before cleanup
     try:
-        await _report_partial_usage(printer_id, tracking)
+        await _report_partial_usage(
+            printer_id,
+            tracking,
+            last_layer_num=last_layer_num,
+            last_progress=last_progress,
+        )
     except Exception as e:
         logger.warning("[SPOOLMAN] Partial usage report failed: %s", e)
 
@@ -234,6 +326,7 @@ async def _report_spool_usage_for_slots(
     ams_trays: dict[int, dict],
     slot_to_tray: list | None,
     method_label: str,
+    printer_serial: str = "",
 ) -> int:
     """Report usage to Spoolman for a list of (slot_id, grams) pairs.
 
@@ -244,13 +337,23 @@ async def _report_spool_usage_for_slots(
         if grams_used <= 0:
             continue
 
-        global_tray_id = _resolve_global_tray_id(slot_id, slot_to_tray)
+        global_tray_id = _resolve_global_tray_id(slot_id, slot_to_tray, ams_trays)
         tray_info = ams_trays.get(global_tray_id)
         if not tray_info:
             logger.debug("[SPOOLMAN] Slot %s: no tray at global_tray_id %s", slot_id, global_tray_id)
             continue
 
-        spool_tag = _resolve_spool_tag(tray_info)
+        is_external = global_tray_id >= 254
+        tray_type = tray_info.get("tray_type", "")
+        logger.debug(
+            "[SPOOLMAN] Slot %s resolved to global_tray_id %s (tray_type=%s, external=%s)",
+            slot_id,
+            global_tray_id,
+            tray_type or "unknown",
+            is_external,
+        )
+
+        spool_tag = _resolve_spool_tag(tray_info, printer_serial, global_tray_id)
         if not spool_tag:
             logger.debug("[SPOOLMAN] Slot %s: no identifier for tray %s", slot_id, global_tray_id)
             continue
@@ -268,7 +371,12 @@ async def _report_spool_usage_for_slots(
     return spools_updated
 
 
-async def _report_partial_usage(printer_id: int, tracking):
+async def _report_partial_usage(
+    printer_id: int,
+    tracking,
+    last_layer_num: int | None = None,
+    last_progress: int | None = None,
+):
     """Report partial filament usage based on actual G-code layer data.
 
     Uses per-layer cumulative extrusion from G-code parsing for accurate
@@ -292,17 +400,39 @@ async def _report_partial_usage(printer_id: int, tracking):
         if not spoolman_enabled or spoolman_enabled.lower() != "true":
             return
 
-    # Get current printer state for layer progress
+    # Get current printer state for layer progress.
+    # On failed/aborted prints the firmware may already reset to IDLE with layer=0,
+    # so we fall back to completion-time hints captured from MQTT.
     state = printer_manager.get_status(printer_id)
-    if not state:
-        logger.debug("[SPOOLMAN] No printer state available for partial usage")
-        return
+    current_layer = state.layer_num if state else None
+    total_layers = state.total_layers if state else None
 
-    current_layer = state.layer_num
-    total_layers = state.total_layers
+    if (not current_layer or current_layer <= 0) and last_layer_num and last_layer_num > 0:
+        current_layer = last_layer_num
+        logger.debug("[SPOOLMAN] Using captured last_layer_num=%s for partial usage", current_layer)
+
+    progress_ratio_from_event = None
+    if last_progress is not None:
+        try:
+            progress_ratio_from_event = min(max(float(last_progress), 0.0), 100.0) / 100.0
+        except (TypeError, ValueError):
+            progress_ratio_from_event = None
+
+    if (not current_layer or current_layer <= 0) and progress_ratio_from_event and total_layers and total_layers > 0:
+        current_layer = max(1, int(round(total_layers * progress_ratio_from_event)))
+        logger.debug(
+            "[SPOOLMAN] Estimated layer from last_progress=%s%% and total_layers=%s -> %s",
+            last_progress,
+            total_layers,
+            current_layer,
+        )
 
     if not current_layer or current_layer <= 0:
-        logger.debug("[SPOOLMAN] No progress to report (layer 0 or unknown)")
+        logger.debug(
+            "[SPOOLMAN] No progress to report (layer 0/unknown, last_layer_num=%s, last_progress=%s)",
+            last_layer_num,
+            last_progress,
+        )
         return
 
     logger.info("[SPOOLMAN] Reporting partial usage at layer %s/%s", current_layer, total_layers or "?")
@@ -313,6 +443,7 @@ async def _report_partial_usage(printer_id: int, tracking):
     filament_usage = tracking.filament_usage or []
     ams_trays = {int(k): v for k, v in (tracking.ams_trays or {}).items()}
     slot_to_tray = tracking.slot_to_tray
+    printer_serial = await _get_printer_serial(printer_id)
 
     client = await _get_spoolman_client_with_fallback()
     if not client:
@@ -335,13 +466,13 @@ async def _report_partial_usage(printer_id: int, tracking):
                 slot_id = filament_id + 1  # filament_id is 0-based, slot_id is 1-based
 
                 # Get density from Spoolman (most accurate), fall back to 3MF, then PLA default
-                global_tray_id = _resolve_global_tray_id(slot_id, slot_to_tray)
+                global_tray_id = _resolve_global_tray_id(slot_id, slot_to_tray, ams_trays)
                 tray_info = ams_trays.get(global_tray_id)
                 density = None
                 diameter = 1.75
 
                 if tray_info:
-                    spool_tag = _resolve_spool_tag(tray_info)
+                    spool_tag = _resolve_spool_tag(tray_info, printer_serial, global_tray_id)
                     if spool_tag:
                         spool = await client.find_spool_by_tag(spool_tag)
                         if spool:
@@ -358,18 +489,27 @@ async def _report_partial_usage(printer_id: int, tracking):
                 usage_items.append((slot_id, grams_used))
 
             spools_updated = await _report_spool_usage_for_slots(
-                client, usage_items, ams_trays, slot_to_tray, "Partial (G-code)"
+                client, usage_items, ams_trays, slot_to_tray, "Partial (G-code)", printer_serial
             )
             if spools_updated > 0:
                 logger.info("[SPOOLMAN] Reported partial usage to %s spool(s) using G-code data", spools_updated)
             return
 
     # Fallback: linear interpolation (if no G-code data available)
-    if not total_layers or total_layers <= 0:
-        logger.debug("[SPOOLMAN] Cannot use linear fallback: total_layers=%s", total_layers)
+    progress_ratio = None
+    if total_layers and total_layers > 0:
+        progress_ratio = min(current_layer / total_layers, 1.0)
+    elif progress_ratio_from_event is not None:
+        progress_ratio = progress_ratio_from_event
+
+    if progress_ratio is None:
+        logger.debug(
+            "[SPOOLMAN] Cannot use linear fallback: total_layers=%s, last_progress=%s",
+            total_layers,
+            last_progress,
+        )
         return
 
-    progress_ratio = min(current_layer / total_layers, 1.0)
     logger.info("[SPOOLMAN] Falling back to linear interpolation (%s)", progress_ratio)
 
     usage_items = []
@@ -381,7 +521,7 @@ async def _report_partial_usage(printer_id: int, tracking):
             usage_items.append((slot_id, partial_used_g))
 
     spools_updated = await _report_spool_usage_for_slots(
-        client, usage_items, ams_trays, slot_to_tray, "Partial (linear)"
+        client, usage_items, ams_trays, slot_to_tray, "Partial (linear)", printer_serial
     )
     if spools_updated > 0:
         logger.info("[SPOOLMAN] Reported partial usage to %s spool(s) using linear interpolation", spools_updated)
@@ -412,6 +552,7 @@ async def report_usage(printer_id: int, archive_id: int):
         filament_usage = tracking.filament_usage or []
         ams_trays = {int(k): v for k, v in (tracking.ams_trays or {}).items()}
         slot_to_tray = tracking.slot_to_tray
+        printer_serial = await _get_printer_serial(printer_id)
 
         # Delete tracking row (we're done with it)
         await db.delete(tracking)
@@ -435,7 +576,7 @@ async def report_usage(printer_id: int, archive_id: int):
 
         usage_items = [(u.get("slot_id", 0), u.get("used_g", 0)) for u in filament_usage]
         spools_updated = await _report_spool_usage_for_slots(
-            client, usage_items, ams_trays, slot_to_tray, f"Archive {archive_id}"
+            client, usage_items, ams_trays, slot_to_tray, f"Archive {archive_id}", printer_serial
         )
 
         if spools_updated == 0:

+ 99 - 0
backend/tests/unit/services/test_spoolman_tracking.py

@@ -1,9 +1,18 @@
 """Unit tests for Spoolman tracking service helpers."""
 
+from types import SimpleNamespace
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
 from backend.app.services.spoolman_tracking import (
+    _get_fallback_spool_tag,
+    _global_tray_id_to_ams_slot,
+    _hash_serial_to_hex32,
     _resolve_global_tray_id,
     _resolve_spool_tag,
     build_ams_tray_lookup,
+    store_print_data,
 )
 
 
@@ -22,6 +31,20 @@ class TestResolveSpoolTag:
         tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": "DEADBEEF"}
         assert _resolve_spool_tag(tray) == "DEADBEEF"
 
+    def test_rejects_zero_tag_uid(self):
+        tray = {"tray_uuid": "", "tag_uid": "0000000000000000"}
+        assert _resolve_spool_tag(tray) == ""
+
+    def test_uses_fallback_tag_when_ids_missing(self):
+        tray = {"tray_uuid": "", "tag_uid": ""}
+        # global_tray_id 0 -> ams_id 0, tray_id 0
+        assert _resolve_spool_tag(tray, "01P00A000000000", 0) == "ABA7845700000000"
+
+    def test_uses_fallback_tag_when_ids_zero(self):
+        tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": "0000000000000000"}
+        # global_tray_id 5 -> ams_id 1, tray_id 1
+        assert _resolve_spool_tag(tray, "01P00A000000000", 5) == "ABA7845700010001"
+
     def test_empty_both(self):
         tray = {"tray_uuid": "", "tag_uid": ""}
         assert _resolve_spool_tag(tray) == ""
@@ -65,6 +88,36 @@ class TestResolveGlobalTrayId:
         assert _resolve_global_tray_id(1, mapping) == 0
 
 
+class TestFallbackTagHelpers:
+    """Tests for frontend-mirrored fallback tag helpers."""
+
+    def test_hash_serial_matches_frontend_algorithm(self):
+        assert _hash_serial_to_hex32("01P00A000000000") == "ABA78457"
+        # Frontend trims and uppercases before hashing
+        assert _hash_serial_to_hex32(" 01p00a000000000 ") == "ABA78457"
+
+    def test_global_tray_to_ams_slot_standard_ams(self):
+        assert _global_tray_id_to_ams_slot(0) == (0, 0)
+        assert _global_tray_id_to_ams_slot(7) == (1, 3)
+
+    def test_global_tray_to_ams_slot_ams_ht(self):
+        assert _global_tray_id_to_ams_slot(128) == (128, 0)
+        assert _global_tray_id_to_ams_slot(135) == (135, 0)
+
+    def test_global_tray_to_ams_slot_external(self):
+        assert _global_tray_id_to_ams_slot(254) == (255, 0)
+        assert _global_tray_id_to_ams_slot(255) == (255, 1)
+
+    def test_get_fallback_spool_tag_standard(self):
+        assert _get_fallback_spool_tag("01P00A000000000", 5) == "ABA7845700010001"
+
+    def test_get_fallback_spool_tag_ams_ht(self):
+        assert _get_fallback_spool_tag("01P00A000000000", 128) == "ABA7845700800000"
+
+    def test_get_fallback_spool_tag_external(self):
+        assert _get_fallback_spool_tag("01P00A000000000", 255) == "ABA7845700FF0001"
+
+
 class TestBuildAmsTrayLookup:
     """Tests for build_ams_tray_lookup()."""
 
@@ -118,3 +171,49 @@ class TestBuildAmsTrayLookup:
         raw = {"ams": [{"id": 0, "tray": [{"id": 0}]}]}
         lookup = build_ams_tray_lookup(raw)
         assert lookup[0] == {"tray_uuid": "", "tag_uid": "", "tray_type": ""}
+
+
+class TestStorePrintData:
+    """Tests for store_print_data()."""
+
+    @pytest.mark.asyncio
+    async def test_prefers_explicit_ams_mapping_over_queue_mapping(self):
+        db = AsyncMock()
+        delete_result = MagicMock()
+        db.execute = AsyncMock(side_effect=[delete_result])
+        db.add = MagicMock()
+        db.commit = AsyncMock()
+
+        printer_manager = MagicMock()
+        printer_manager.get_status.return_value = SimpleNamespace(
+            raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "tray_type": "PLA"}, {"id": 1, "tray_type": "PLA"}]}]}
+        )
+
+        mock_settings = MagicMock()
+        mock_path = MagicMock()
+        mock_path.exists.return_value = True
+        mock_settings.base_dir.__truediv__.return_value = mock_path
+
+        with (
+            patch("backend.app.services.spoolman_tracking.app_settings", mock_settings),
+            patch("backend.app.api.routes.settings.get_setting", AsyncMock(side_effect=["true", "true"])),
+            patch(
+                "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
+                return_value=[{"slot_id": 1, "used_g": 3.83, "type": "PLA", "color": "#FF0000"}],
+            ),
+            patch("backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf", return_value=None),
+            patch("backend.app.utils.threemf_tools.extract_filament_properties_from_3mf", return_value={}),
+        ):
+            await store_print_data(
+                printer_id=1,
+                archive_id=15,
+                file_path="archives/test.3mf",
+                db=db,
+                printer_manager=printer_manager,
+                ams_mapping=[1, -1, -1, -1],
+            )
+
+        db.add.assert_called_once()
+        tracking = db.add.call_args.args[0]
+        assert tracking.slot_to_tray == [1, -1, -1, -1]
+        db.execute.assert_called_once()