Browse Source

test(mqtt): add tray_now disambiguation regression tests (28 tests)

Replay realistic MQTT message sequences through _process_message() to
cover all tray_now disambiguation paths: single-nozzle passthrough
(X1E/P2S), H2D dual-nozzle snow field, pending target, ams_extruder_map
fallback, active extruder switching, and full multi-color lifecycles.
maziggy 3 months ago
parent
commit
c19de47ce5
2 changed files with 573 additions and 0 deletions
  1. 1 0
      CHANGELOG.md
  2. 572 0
      backend/tests/unit/services/test_bambu_mqtt.py

+ 1 - 0
CHANGELOG.md

@@ -19,6 +19,7 @@ All notable changes to Bambuddy will be documented in this file.
 
 
 ### Improved
 ### Improved
 - **AMS Mapping Test Coverage** — Added 63 backend tests for scheduler AMS mapping (nozzle filtering, external spool extruder assignment, fallback behavior) and 43 frontend tests for `useFilamentMapping` hook (nozzle-aware matching, AMS-HT handling, external spool extruder logic).
 - **AMS Mapping Test Coverage** — Added 63 backend tests for scheduler AMS mapping (nozzle filtering, external spool extruder assignment, fallback behavior) and 43 frontend tests for `useFilamentMapping` hook (nozzle-aware matching, AMS-HT handling, external spool extruder logic).
+- **Tray Now Disambiguation Test Coverage** — Added 28 MQTT message replay tests covering all `tray_now` disambiguation paths: single-nozzle passthrough (X1E/P2S), H2D dual-nozzle snow field, pending target, `ams_extruder_map` fallback, active extruder switching, and full multi-color print lifecycles.
 
 
 
 
 ## [0.2.0] - 2026-02-17
 ## [0.2.0] - 2026-02-17

+ 572 - 0
backend/tests/unit/services/test_bambu_mqtt.py

@@ -1205,3 +1205,575 @@ class TestRequestTopicAmsMapping:
         assert complete_data["status"] == "completed"
         assert complete_data["status"] == "completed"
         # Mapping cleared after completion
         # Mapping cleared after completion
         assert mqtt_client._captured_ams_mapping is None
         assert mqtt_client._captured_ams_mapping is None
+
+
+# ---------------------------------------------------------------------------
+# tray_now disambiguation helpers
+# ---------------------------------------------------------------------------
+
+
+def _ams_payload(tray_now, ams_units=None, tray_exist_bits=None):
+    """Build minimal print.ams payload for tray_now disambiguation tests."""
+    ams = {"tray_now": str(tray_now)}
+    if ams_units is not None:
+        ams["ams"] = ams_units
+    if tray_exist_bits is not None:
+        ams["tray_exist_bits"] = tray_exist_bits
+    return {"print": {"ams": ams}}
+
+
+def _extruder_info_payload(extruders):
+    """Build device.extruder.info payload (dual-nozzle detection + snow).
+
+    Each entry in *extruders* is a dict with at least ``id`` and ``snow``.
+    """
+    return {
+        "print": {
+            "device": {
+                "extruder": {
+                    "info": extruders,
+                }
+            }
+        }
+    }
+
+
+def _extruder_state_payload(state_val):
+    """Build device.extruder.state payload (active extruder via bit 8)."""
+    return {
+        "print": {
+            "device": {
+                "extruder": {
+                    "state": state_val,
+                }
+            }
+        }
+    }
+
+
+# ---------------------------------------------------------------------------
+# 1. Single-nozzle X1E — direct passthrough
+# ---------------------------------------------------------------------------
+
+
+class TestTrayNowSingleNozzleX1E:
+    """Single-nozzle, 1 AMS — tray_now is a direct passthrough."""
+
+    @pytest.fixture
+    def mqtt_client(self):
+        from backend.app.services.bambu_mqtt import BambuMQTTClient
+
+        return BambuMQTTClient(
+            ip_address="192.168.1.100",
+            serial_number="TEST_X1E",
+            access_code="12345678",
+        )
+
+    def test_tray_now_direct_passthrough_slot_0_to_3(self, mqtt_client):
+        """Each tray_now 0-3 maps 1:1 on single-nozzle printers."""
+        for slot in range(4):
+            mqtt_client._process_message(_ams_payload(slot))
+            assert mqtt_client.state.tray_now == slot
+
+    def test_tray_now_255_means_unloaded(self, mqtt_client):
+        """tray_now=255 means no filament loaded."""
+        mqtt_client._process_message(_ams_payload(255))
+        assert mqtt_client.state.tray_now == 255
+
+    def test_single_extruder_does_not_trigger_dual_nozzle(self, mqtt_client):
+        """device.extruder.info with 1 entry must NOT set _is_dual_nozzle."""
+        mqtt_client._process_message(_extruder_info_payload([{"id": 0, "snow": 0xFF00FF}]))
+        assert mqtt_client._is_dual_nozzle is False
+
+    def test_last_loaded_tray_survives_unload(self, mqtt_client):
+        """Load tray 2, unload → last_loaded_tray stays 2."""
+        mqtt_client._process_message(_ams_payload(2))
+        assert mqtt_client.state.last_loaded_tray == 2
+
+        mqtt_client._process_message(_ams_payload(255))
+        assert mqtt_client.state.tray_now == 255
+        assert mqtt_client.state.last_loaded_tray == 2
+
+
+# ---------------------------------------------------------------------------
+# 2. Single-nozzle P2S — multiple AMS, global IDs pass through
+# ---------------------------------------------------------------------------
+
+
+class TestTrayNowSingleNozzleP2S:
+    """Single-nozzle, 2 AMS — global IDs 4-7 for AMS 1 pass through directly."""
+
+    @pytest.fixture
+    def mqtt_client(self):
+        from backend.app.services.bambu_mqtt import BambuMQTTClient
+
+        return BambuMQTTClient(
+            ip_address="192.168.1.100",
+            serial_number="TEST_P2S",
+            access_code="12345678",
+        )
+
+    def test_tray_now_ams1_global_ids_4_to_7(self, mqtt_client):
+        """tray_now 4-7 are global IDs for AMS 1 on single-nozzle printers."""
+        for global_id in range(4, 8):
+            mqtt_client._process_message(_ams_payload(global_id))
+            assert mqtt_client.state.tray_now == global_id
+
+    def test_tray_change_across_ams_units(self, mqtt_client):
+        """Switch from AMS 0 slot 1 → AMS 1 slot 2 (global 6)."""
+        mqtt_client._process_message(_ams_payload(1))
+        assert mqtt_client.state.tray_now == 1
+
+        mqtt_client._process_message(_ams_payload(6))
+        assert mqtt_client.state.tray_now == 6
+
+
+# ---------------------------------------------------------------------------
+# 3. H2D Pro — initial state detection
+# ---------------------------------------------------------------------------
+
+
+class TestTrayNowDualNozzleH2DSetup:
+    """H2D Pro initial state detection."""
+
+    @pytest.fixture
+    def mqtt_client(self):
+        from backend.app.services.bambu_mqtt import BambuMQTTClient
+
+        return BambuMQTTClient(
+            ip_address="192.168.1.100",
+            serial_number="TEST_H2D",
+            access_code="12345678",
+        )
+
+    def test_dual_nozzle_detected_from_extruder_info(self, mqtt_client):
+        """2 entries in device.extruder.info → _is_dual_nozzle=True."""
+        mqtt_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0xFF00FF},
+                    {"id": 1, "snow": 0xFF00FF},
+                ]
+            )
+        )
+        assert mqtt_client._is_dual_nozzle is True
+
+    def test_ams_extruder_map_parsed_from_info_field(self, mqtt_client):
+        """AMS 0 info=2003 → right (ext 0), AMS 128 info=2104 → left (ext 1)."""
+        ams_units = [
+            {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
+            {"id": 128, "info": 2104, "tray": [{"id": 0}]},
+        ]
+        payload = {
+            "print": {
+                "ams": {
+                    "ams": ams_units,
+                    "tray_now": "255",
+                    "tray_exist_bits": "1000f",
+                },
+            }
+        }
+        mqtt_client._process_message(payload)
+
+        # info=2003: bit8 = (2003>>8)&1 = 7&1 = 1 → extruder = 1-1 = 0 (right)
+        # info=2104: bit8 = (2104>>8)&1 = 8&1 = 0 → extruder = 1-0 = 1 (left)
+        assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
+
+    def test_dual_nozzle_detection_before_ams_in_same_message(self, mqtt_client):
+        """Dual-nozzle detection at line 538 happens before _handle_ams_data() at line 549.
+
+        If both arrive in the same message, tray_now disambiguation already uses dual-nozzle logic.
+        """
+        payload = {
+            "print": {
+                "device": {
+                    "extruder": {
+                        "info": [
+                            {"id": 0, "snow": 0xFF00FF},
+                            {"id": 1, "snow": 0xFF00FF},
+                        ],
+                        "state": 0x0001,
+                    }
+                },
+                "ams": {
+                    "ams": [
+                        {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
+                    ],
+                    "tray_now": "2",
+                    "tray_exist_bits": "f",
+                },
+            }
+        }
+        mqtt_client._process_message(payload)
+
+        # Dual-nozzle was detected; AMS 0 on right extruder (active by default);
+        # snow is 0xFF00FF (unloaded), so falls through to ams_extruder_map fallback.
+        # Single AMS on extruder 0 → global_id = 0*4+2 = 2
+        assert mqtt_client._is_dual_nozzle is True
+        assert mqtt_client.state.tray_now == 2
+
+
+# ---------------------------------------------------------------------------
+# Shared H2D fixture for classes 4-8
+# ---------------------------------------------------------------------------
+
+
+class _H2DFixtureMixin:
+    """Mixin providing a pre-configured H2D Pro client."""
+
+    @pytest.fixture
+    def mqtt_client(self):
+        from backend.app.services.bambu_mqtt import BambuMQTTClient
+
+        return BambuMQTTClient(
+            ip_address="192.168.1.100",
+            serial_number="TEST_H2D",
+            access_code="12345678",
+        )
+
+    @pytest.fixture
+    def h2d_client(self, mqtt_client):
+        """Pre-configure as H2D Pro: dual-nozzle + ams_extruder_map."""
+        mqtt_client._process_message(
+            {
+                "print": {
+                    "device": {
+                        "extruder": {
+                            "info": [
+                                {"id": 0, "snow": 0xFF00FF},
+                                {"id": 1, "snow": 0xFF00FF},
+                            ],
+                            "state": 0x0001,  # right extruder active
+                        }
+                    },
+                    "ams": {
+                        "ams": [
+                            {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
+                            {"id": 128, "info": 2104, "tray": [{"id": 0}]},
+                        ],
+                        "tray_now": "255",
+                        "tray_exist_bits": "1000f",
+                    },
+                }
+            }
+        )
+        assert mqtt_client._is_dual_nozzle is True
+        assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
+        return mqtt_client
+
+
+# ---------------------------------------------------------------------------
+# 4. H2D Snow field disambiguation
+# ---------------------------------------------------------------------------
+
+
+class TestTrayNowDualNozzleH2DSnow(_H2DFixtureMixin):
+    """Snow field disambiguation (primary path)."""
+
+    def test_snow_disambiguates_ams0_slot(self, h2d_client):
+        """snow ext[0]=AMS 0 slot 2, tray_now='2' → global 2."""
+        # Send snow update FIRST (snow is parsed AFTER tray_now in the same message,
+        # so we need it in a prior message).
+        snow_val = 0 << 8 | 2  # AMS 0 slot 2 = raw 2
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": snow_val},
+                    {"id": 1, "snow": 0xFF00FF},
+                ]
+            )
+        )
+        assert h2d_client.state.h2d_extruder_snow.get(0) == 2
+
+        # Now send tray_now=2
+        h2d_client._process_message(_ams_payload(2))
+        assert h2d_client.state.tray_now == 2
+
+    def test_snow_disambiguates_ams_ht_to_128(self, h2d_client):
+        """snow ext[1]=AMS HT (128), left active, tray_now='0' → global 128."""
+        # Snow: extruder 1 → AMS 128 slot 0
+        snow_val = 128 << 8 | 0  # = 32768
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0xFF00FF},
+                    {"id": 1, "snow": snow_val},
+                ]
+            )
+        )
+        assert h2d_client.state.h2d_extruder_snow.get(1) == 128
+
+        # Switch to left extruder
+        h2d_client._process_message(_extruder_state_payload(0x0100))
+        assert h2d_client.state.active_extruder == 1
+
+        # tray_now="0" with left extruder active, snow says AMS HT (128)
+        # AMS HT snow_slot = 0 (single slot), parsed_tray_now = 0 → match
+        h2d_client._process_message(_ams_payload(0))
+        assert h2d_client.state.tray_now == 128
+
+    def test_snow_updates_h2d_extruder_snow_state(self, h2d_client):
+        """Verify state.h2d_extruder_snow dict is populated correctly."""
+        snow_ext0 = 1 << 8 | 3  # AMS 1 slot 3 → global 7
+        snow_ext1 = 0 << 8 | 0  # AMS 0 slot 0 → global 0
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": snow_ext0},
+                    {"id": 1, "snow": snow_ext1},
+                ]
+            )
+        )
+        assert h2d_client.state.h2d_extruder_snow[0] == 7
+        assert h2d_client.state.h2d_extruder_snow[1] == 0
+
+    def test_snow_unloaded_value(self, h2d_client):
+        """snow=0xFFFF (ams_id=255, slot=255) → 255 (unloaded)."""
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0xFFFF},
+                    {"id": 1, "snow": 0xFFFF},
+                ]
+            )
+        )
+        assert h2d_client.state.h2d_extruder_snow[0] == 255
+        assert h2d_client.state.h2d_extruder_snow[1] == 255
+
+    def test_snow_initial_sentinel_not_stored(self, h2d_client):
+        """snow=0xFF00FF (firmware initial sentinel) is not parsed into h2d_extruder_snow."""
+        # 0xFF00FF has ams_id=0xFF00=65280 which doesn't match any branch
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0xFF00FF},
+                    {"id": 1, "snow": 0xFF00FF},
+                ]
+            )
+        )
+        # Snow dict should remain empty (no matching branch)
+        assert h2d_client.state.h2d_extruder_snow == {}
+
+
+# ---------------------------------------------------------------------------
+# 5. H2D Pending target disambiguation
+# ---------------------------------------------------------------------------
+
+
+class TestTrayNowDualNozzleH2DPendingTarget(_H2DFixtureMixin):
+    """Pending target disambiguation (when Bambuddy initiates load)."""
+
+    def test_pending_target_matches_slot(self, h2d_client):
+        """pending=5, tray_now='1' (5%4=1 matches) → tray_now=5."""
+        h2d_client.state.pending_tray_target = 5
+        h2d_client._process_message(_ams_payload(1))
+        assert h2d_client.state.tray_now == 5
+        assert h2d_client.state.pending_tray_target is None  # cleared
+
+    def test_pending_target_slot_mismatch(self, h2d_client):
+        """pending=5, tray_now='2' → uses raw slot, clears pending."""
+        h2d_client.state.pending_tray_target = 5
+        h2d_client._process_message(_ams_payload(2))
+        # Slot 2 != 5%4=1 → mismatch, uses raw slot 2
+        assert h2d_client.state.tray_now == 2
+        assert h2d_client.state.pending_tray_target is None
+
+    def test_pending_target_takes_priority_over_snow(self, h2d_client):
+        """When both pending and snow are set, pending wins."""
+        # Set up snow for extruder 0 → AMS 0 slot 1 → global 1
+        snow_val = 0 << 8 | 1
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": snow_val},
+                    {"id": 1, "snow": 0xFF00FF},
+                ]
+            )
+        )
+        assert h2d_client.state.h2d_extruder_snow.get(0) == 1
+
+        # Set pending target to AMS 1 slot 1 (global 5)
+        h2d_client.state.pending_tray_target = 5
+        # tray_now="1" — matches pending (5%4=1), pending should win over snow
+        h2d_client._process_message(_ams_payload(1))
+        assert h2d_client.state.tray_now == 5
+
+
+# ---------------------------------------------------------------------------
+# 6. H2D ams_extruder_map fallback
+# ---------------------------------------------------------------------------
+
+
+class TestTrayNowDualNozzleH2DFallback(_H2DFixtureMixin):
+    """ams_extruder_map fallback (no pending, no snow)."""
+
+    def test_single_ams_on_extruder_computes_global_id(self, h2d_client):
+        """AMS 0 on right extruder, tray_now='2' → 0*4+2=2."""
+        # h2d_client has snow=0xFF00FF (unloaded) by default, so snow path skips
+        h2d_client._process_message(_ams_payload(2))
+        # AMS 0 is the only AMS on extruder 0 (right, active by default)
+        # Fallback: single AMS → global = 0*4+2 = 2
+        assert h2d_client.state.tray_now == 2
+
+    def test_multiple_ams_keeps_current_if_valid(self, h2d_client):
+        """Current tray matches slot → keeps it (multi-AMS on same extruder)."""
+        # Set up: two AMS units on the same extruder (right, ext 0)
+        h2d_client.state.ams_extruder_map = {"0": 0, "1": 0}
+        # Pre-set tray_now=5 (AMS 1 slot 1) — current_ams=1 which is in ams_on_extruder
+        h2d_client.state.tray_now = 5
+        # tray_now="1" → 5%4=1 matches → keep current=5
+        h2d_client._process_message(_ams_payload(1))
+        assert h2d_client.state.tray_now == 5
+
+    def test_no_ams_on_extruder_uses_raw_slot(self, h2d_client):
+        """No AMS mapped to the active extruder → raw slot as global ID."""
+        # All AMS on left extruder, but right is active
+        h2d_client.state.ams_extruder_map = {"0": 1, "128": 1}
+        h2d_client._process_message(_ams_payload(2))
+        assert h2d_client.state.tray_now == 2
+
+
+# ---------------------------------------------------------------------------
+# 7. H2D Active extruder switching
+# ---------------------------------------------------------------------------
+
+
+class TestTrayNowDualNozzleH2DActiveExtruder(_H2DFixtureMixin):
+    """Active extruder switching via device.extruder.state bit 8."""
+
+    def test_active_extruder_right_by_default(self, h2d_client):
+        """Initial state.active_extruder == 0 (right)."""
+        assert h2d_client.state.active_extruder == 0
+
+    def test_extruder_state_bit8_switches_to_left(self, h2d_client):
+        """state=0x100 → active_extruder=1 (left)."""
+        h2d_client._process_message(_extruder_state_payload(0x0100))
+        assert h2d_client.state.active_extruder == 1
+
+    def test_extruder_state_bit8_switches_back_to_right(self, h2d_client):
+        """Cycle 0 → 1 → 0."""
+        h2d_client._process_message(_extruder_state_payload(0x0100))
+        assert h2d_client.state.active_extruder == 1
+
+        h2d_client._process_message(_extruder_state_payload(0x0001))
+        assert h2d_client.state.active_extruder == 0
+
+    def test_extruder_switch_changes_tray_disambiguation(self, h2d_client):
+        """Snow on both extruders; switching active changes which snow is used."""
+        # Snow: ext 0 → AMS 0 slot 1 (global 1), ext 1 → AMS 128 slot 0 (global 128)
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0 << 8 | 1},  # AMS 0 slot 1 → global 1
+                    {"id": 1, "snow": 128 << 8 | 0},  # AMS HT → global 128
+                ]
+            )
+        )
+
+        # Right active (default) — tray_now="1" → snow ext[0] says global 1
+        h2d_client._process_message(_ams_payload(1))
+        assert h2d_client.state.tray_now == 1
+
+        # Switch to left
+        h2d_client._process_message(_extruder_state_payload(0x0100))
+
+        # Left active — tray_now="0" → snow ext[1] says AMS HT (128), slot 0 matches
+        h2d_client._process_message(_ams_payload(0))
+        assert h2d_client.state.tray_now == 128
+
+
+# ---------------------------------------------------------------------------
+# 8. H2D Full multi-message sequences
+# ---------------------------------------------------------------------------
+
+
+class TestTrayNowDualNozzleH2DFullSequence(_H2DFixtureMixin):
+    """Multi-message sequences simulating real H2D Pro prints."""
+
+    def test_h2d_right_nozzle_ams0_lifecycle(self, h2d_client):
+        """Setup → load AMS 0 slot 1 → verify tray_now=1."""
+        # Snow update: extruder 0 loading AMS 0 slot 1
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0 << 8 | 1},
+                    {"id": 1, "snow": 0xFF00FF},
+                ]
+            )
+        )
+        # Printer reports tray_now="1"
+        h2d_client._process_message(_ams_payload(1))
+        assert h2d_client.state.tray_now == 1
+        assert h2d_client.state.last_loaded_tray == 1
+
+    def test_h2d_left_nozzle_ams_ht_lifecycle(self, h2d_client):
+        """Setup → switch left → load AMS HT → verify tray_now=128."""
+        # Switch to left extruder
+        h2d_client._process_message(_extruder_state_payload(0x0100))
+
+        # Snow: ext 1 → AMS HT slot 0
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0xFF00FF},
+                    {"id": 1, "snow": 128 << 8 | 0},
+                ]
+            )
+        )
+
+        # Printer reports tray_now="0" (AMS HT single slot)
+        h2d_client._process_message(_ams_payload(0))
+        assert h2d_client.state.tray_now == 128
+        assert h2d_client.state.last_loaded_tray == 128
+
+    def test_h2d_multi_color_alternating_nozzles(self, h2d_client):
+        """Multi-color print alternating between right and left nozzles.
+
+        Sequence:
+        1. Right loads AMS 0 slot 0 (tray=0)
+        2. Switch left, load AMS HT (tray=128)
+        3. Switch right, snow updates, load AMS 0 slot 2 (tray=2)
+        4. Unload (255)
+        """
+        # Step 1: Right extruder loads AMS 0 slot 0
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0 << 8 | 0},
+                    {"id": 1, "snow": 0xFF00FF},
+                ]
+            )
+        )
+        h2d_client._process_message(_ams_payload(0))
+        assert h2d_client.state.tray_now == 0
+
+        # Step 2: Switch to left, load AMS HT
+        h2d_client._process_message(_extruder_state_payload(0x0100))
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0 << 8 | 0},
+                    {"id": 1, "snow": 128 << 8 | 0},
+                ]
+            )
+        )
+        h2d_client._process_message(_ams_payload(0))
+        assert h2d_client.state.tray_now == 128
+
+        # Step 3: Switch back to right, load AMS 0 slot 2
+        h2d_client._process_message(_extruder_state_payload(0x0001))
+        h2d_client._process_message(
+            _extruder_info_payload(
+                [
+                    {"id": 0, "snow": 0 << 8 | 2},
+                    {"id": 1, "snow": 128 << 8 | 0},
+                ]
+            )
+        )
+        h2d_client._process_message(_ams_payload(2))
+        assert h2d_client.state.tray_now == 2
+
+        # Step 4: Unload
+        h2d_client._process_message(_ams_payload(255))
+        assert h2d_client.state.tray_now == 255
+        assert h2d_client.state.last_loaded_tray == 2