|
|
@@ -1205,3 +1205,1101 @@ class TestRequestTopicAmsMapping:
|
|
|
assert complete_data["status"] == "completed"
|
|
|
# Mapping cleared after completion
|
|
|
assert mqtt_client._captured_ams_mapping is None
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# tray_now disambiguation helpers
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+def _ams_payload(tray_now, ams_units=None, tray_exist_bits=None, ams_exist_bits=None):
|
|
|
+ """Build minimal print.ams payload for tray_now disambiguation tests."""
|
|
|
+ ams = {"tray_now": str(tray_now)}
|
|
|
+ if ams_units is not None:
|
|
|
+ ams["ams"] = ams_units
|
|
|
+ if tray_exist_bits is not None:
|
|
|
+ ams["tray_exist_bits"] = tray_exist_bits
|
|
|
+ if ams_exist_bits is not None:
|
|
|
+ ams["ams_exist_bits"] = ams_exist_bits
|
|
|
+ return {"print": {"ams": ams}}
|
|
|
+
|
|
|
+
|
|
|
+def _extruder_info_payload(extruders):
|
|
|
+ """Build device.extruder.info payload (dual-nozzle detection + snow).
|
|
|
+
|
|
|
+ Each entry in *extruders* is a dict with at least ``id`` and ``snow``.
|
|
|
+ """
|
|
|
+ return {
|
|
|
+ "print": {
|
|
|
+ "device": {
|
|
|
+ "extruder": {
|
|
|
+ "info": extruders,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def _extruder_state_payload(state_val):
|
|
|
+ """Build device.extruder.state payload (active extruder via bit 8)."""
|
|
|
+ return {
|
|
|
+ "print": {
|
|
|
+ "device": {
|
|
|
+ "extruder": {
|
|
|
+ "state": state_val,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 1. Single-nozzle X1E — direct passthrough
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowSingleNozzleX1E:
|
|
|
+ """Single-nozzle, 1 AMS — tray_now is a direct passthrough."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ return BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_X1E",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_tray_now_direct_passthrough_slot_0_to_3(self, mqtt_client):
|
|
|
+ """Each tray_now 0-3 maps 1:1 on single-nozzle printers."""
|
|
|
+ for slot in range(4):
|
|
|
+ mqtt_client._process_message(_ams_payload(slot))
|
|
|
+ assert mqtt_client.state.tray_now == slot
|
|
|
+
|
|
|
+ def test_tray_now_255_means_unloaded(self, mqtt_client):
|
|
|
+ """tray_now=255 means no filament loaded."""
|
|
|
+ mqtt_client._process_message(_ams_payload(255))
|
|
|
+ assert mqtt_client.state.tray_now == 255
|
|
|
+
|
|
|
+ def test_single_extruder_does_not_trigger_dual_nozzle(self, mqtt_client):
|
|
|
+ """device.extruder.info with 1 entry must NOT set _is_dual_nozzle."""
|
|
|
+ mqtt_client._process_message(_extruder_info_payload([{"id": 0, "snow": 0xFF00FF}]))
|
|
|
+ assert mqtt_client._is_dual_nozzle is False
|
|
|
+
|
|
|
+ def test_last_loaded_tray_survives_unload(self, mqtt_client):
|
|
|
+ """Load tray 2, unload → last_loaded_tray stays 2."""
|
|
|
+ mqtt_client._process_message(_ams_payload(2))
|
|
|
+ assert mqtt_client.state.last_loaded_tray == 2
|
|
|
+
|
|
|
+ mqtt_client._process_message(_ams_payload(255))
|
|
|
+ assert mqtt_client.state.tray_now == 255
|
|
|
+ assert mqtt_client.state.last_loaded_tray == 2
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 2. Single-nozzle P2S — multiple AMS, global IDs pass through
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowSingleNozzleP2S:
|
|
|
+ """Single-nozzle, 2 AMS — tray_now > 3 passes through as global ID."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ return BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_P2S",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_tray_now_ams1_global_ids_4_to_7(self, mqtt_client):
|
|
|
+ """tray_now 4-7 are global IDs for AMS 1 on single-nozzle printers."""
|
|
|
+ for global_id in range(4, 8):
|
|
|
+ mqtt_client._process_message(_ams_payload(global_id))
|
|
|
+ assert mqtt_client.state.tray_now == global_id
|
|
|
+
|
|
|
+ def test_tray_change_across_ams_units(self, mqtt_client):
|
|
|
+ """Switch from AMS 0 slot 1 → AMS 1 slot 2 (global 6)."""
|
|
|
+ mqtt_client._process_message(_ams_payload(1))
|
|
|
+ assert mqtt_client.state.tray_now == 1
|
|
|
+
|
|
|
+ mqtt_client._process_message(_ams_payload(6))
|
|
|
+ assert mqtt_client.state.tray_now == 6
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 2b. Single-nozzle P2S — multi-AMS local slot disambiguation (#420)
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowP2SMultiAmsDisambiguation:
|
|
|
+ """P2S firmware sends local slot IDs (0-3) in tray_now even with dual AMS.
|
|
|
+
|
|
|
+ When ams_exist_bits indicates >1 AMS unit and tray_now is 0-3, the backend
|
|
|
+ should use the MQTT mapping field (snow-encoded) to resolve the correct
|
|
|
+ global tray ID.
|
|
|
+ """
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ client = BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_P2S_DUAL",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+ return client
|
|
|
+
|
|
|
+ def test_resolves_ams1_slot1_from_mapping(self, mqtt_client):
|
|
|
+ """tray_now=1 with mapping=[257] → global ID 5 (AMS1-T1).
|
|
|
+
|
|
|
+ 257 snow-decoded: ams_hw_id=1, slot=1 → global 1*4+1=5.
|
|
|
+ """
|
|
|
+ # Set mapping field in raw_data (as the MQTT handler would)
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [257]
|
|
|
+ mqtt_client._process_message(
|
|
|
+ _ams_payload(1, ams_exist_bits="3") # '3' = 0b11 → AMS 0 and 1
|
|
|
+ )
|
|
|
+ assert mqtt_client.state.tray_now == 5
|
|
|
+
|
|
|
+ def test_resolves_ams1_slot0_from_mapping(self, mqtt_client):
|
|
|
+ """tray_now=0 with mapping=[256] → global ID 4 (AMS1-T0).
|
|
|
+
|
|
|
+ 256 snow-decoded: ams_hw_id=1, slot=0 → global 1*4+0=4.
|
|
|
+ """
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [256]
|
|
|
+ mqtt_client._process_message(_ams_payload(0, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 4
|
|
|
+
|
|
|
+ def test_resolves_ams1_slot3_from_mapping(self, mqtt_client):
|
|
|
+ """tray_now=3 with mapping=[259] → global ID 7 (AMS1-T3).
|
|
|
+
|
|
|
+ 259 snow-decoded: ams_hw_id=1, slot=3 → global 1*4+3=7.
|
|
|
+ """
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [259]
|
|
|
+ mqtt_client._process_message(_ams_payload(3, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 7
|
|
|
+
|
|
|
+ def test_ams0_slot_unchanged_when_mapping_confirms_ams0(self, mqtt_client):
|
|
|
+ """tray_now=1 with mapping=[1] → stays 1 (AMS0-T1).
|
|
|
+
|
|
|
+ 1 snow-decoded: ams_hw_id=0, slot=1 → global 0*4+1=1.
|
|
|
+ """
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [1]
|
|
|
+ mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 1
|
|
|
+
|
|
|
+ def test_multicolor_resolves_ams1_from_multi_entry_mapping(self, mqtt_client):
|
|
|
+ """Multi-color print: mapping=[0, 257] → tray_now=1 resolves to AMS1-T1 (5).
|
|
|
+
|
|
|
+ Entry 0: ams_hw_id=0, slot=0 (local 0) — doesn't match tray_now=1.
|
|
|
+ Entry 257: ams_hw_id=1, slot=1 (local 1) — matches tray_now=1 → global 5.
|
|
|
+ """
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [0, 257]
|
|
|
+ mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 5
|
|
|
+
|
|
|
+ def test_multicolor_four_slot_mapping(self, mqtt_client):
|
|
|
+ """mapping=[65535, 65535, 65535, 257] → tray_now=1 resolves to global 5.
|
|
|
+
|
|
|
+ Only entry 257 has local slot=1, other entries are unmapped (65535).
|
|
|
+ Reproduces exact data from issue #420 support package.
|
|
|
+ """
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [65535, 65535, 65535, 257]
|
|
|
+ mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 5
|
|
|
+
|
|
|
+ def test_ambiguous_mapping_falls_back_to_local_slot(self, mqtt_client):
|
|
|
+ """Two AMS units with same local slot in mapping → ambiguous, keep local slot.
|
|
|
+
|
|
|
+ mapping=[1, 257]: both have local slot 1 (AMS0-T1 and AMS1-T1).
|
|
|
+ Cannot disambiguate → fall back to tray_now=1.
|
|
|
+ """
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [1, 257]
|
|
|
+ mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 1
|
|
|
+
|
|
|
+ def test_no_mapping_falls_back_to_local_slot(self, mqtt_client):
|
|
|
+ """No mapping field available → fall back to raw tray_now."""
|
|
|
+ # No mapping in raw_data (e.g. manual filament load, not during print)
|
|
|
+ mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 1
|
|
|
+
|
|
|
+ def test_empty_mapping_falls_back_to_local_slot(self, mqtt_client):
|
|
|
+ """Empty mapping list → fall back to raw tray_now."""
|
|
|
+ mqtt_client.state.raw_data["mapping"] = []
|
|
|
+ mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 1
|
|
|
+
|
|
|
+ def test_single_ams_passthrough(self, mqtt_client):
|
|
|
+ """Single AMS (ams_exist_bits='1') → tray_now 0-3 is direct global ID."""
|
|
|
+ mqtt_client._process_message(_ams_payload(2, ams_exist_bits="1"))
|
|
|
+ assert mqtt_client.state.tray_now == 2
|
|
|
+
|
|
|
+ def test_no_ams_exist_bits_passthrough(self, mqtt_client):
|
|
|
+ """No ams_exist_bits in payload → fall back to raw tray_now."""
|
|
|
+ mqtt_client._process_message(_ams_payload(1))
|
|
|
+ assert mqtt_client.state.tray_now == 1
|
|
|
+
|
|
|
+ def test_tray_now_255_unaffected_by_multi_ams(self, mqtt_client):
|
|
|
+ """tray_now=255 (unloaded) passes through regardless of AMS count."""
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [257]
|
|
|
+ mqtt_client._process_message(_ams_payload(255, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 255
|
|
|
+
|
|
|
+ def test_tray_now_above_3_unaffected(self, mqtt_client):
|
|
|
+ """tray_now > 3 is already a global ID and passes through directly."""
|
|
|
+ mqtt_client._process_message(_ams_payload(6, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 6
|
|
|
+
|
|
|
+ def test_last_loaded_tray_uses_resolved_global_id(self, mqtt_client):
|
|
|
+ """last_loaded_tray should reflect the resolved global ID, not local slot."""
|
|
|
+ mqtt_client.state.raw_data["mapping"] = [257]
|
|
|
+ mqtt_client.state.state = "RUNNING"
|
|
|
+ mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
|
|
|
+ assert mqtt_client.state.tray_now == 5
|
|
|
+ assert mqtt_client.state.last_loaded_tray == 5
|
|
|
+
|
|
|
+
|
|
|
+class TestResolveLocalSlotFromMapping:
|
|
|
+ """Unit tests for _resolve_local_slot_from_mapping static method."""
|
|
|
+
|
|
|
+ def test_single_match_ams0(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [1]) == 1
|
|
|
+
|
|
|
+ def test_single_match_ams1(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ # 257 = 1*256 + 1 → AMS1 slot1 → global 5
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [257]) == 5
|
|
|
+
|
|
|
+ def test_single_match_ams2(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ # 514 = 2*256 + 2 → AMS2 slot2 → global 10
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(2, [514]) == 10
|
|
|
+
|
|
|
+ def test_unmapped_entries_skipped(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [65535, 65535, 65535, 257]) == 5
|
|
|
+
|
|
|
+ def test_no_match_returns_none(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ # mapping has slot 0 only, looking for slot 2
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(2, [0]) is None
|
|
|
+
|
|
|
+ def test_ambiguous_returns_none(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ # Both AMS0 slot1 (1) and AMS1 slot1 (257) → ambiguous
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [1, 257]) is None
|
|
|
+
|
|
|
+ def test_none_mapping_returns_none(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(1, None) is None
|
|
|
+
|
|
|
+ def test_empty_mapping_returns_none(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(1, []) is None
|
|
|
+
|
|
|
+ def test_ams_ht_slot0_match(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ # AMS-HT id=128: snow = 128*256 + 0 = 32768
|
|
|
+ assert BambuMQTTClient._resolve_local_slot_from_mapping(0, [32768]) == 128
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 3. H2D Pro — initial state detection
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DSetup:
|
|
|
+ """H2D Pro initial state detection."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ return BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_H2D",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_dual_nozzle_detected_from_extruder_info(self, mqtt_client):
|
|
|
+ """2 entries in device.extruder.info → _is_dual_nozzle=True."""
|
|
|
+ mqtt_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert mqtt_client._is_dual_nozzle is True
|
|
|
+
|
|
|
+ def test_ams_extruder_map_parsed_from_info_field(self, mqtt_client):
|
|
|
+ """AMS 0 info=2003 → right (ext 0), AMS 128 info=2104 → left (ext 1)."""
|
|
|
+ ams_units = [
|
|
|
+ {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
|
|
|
+ {"id": 128, "info": 2104, "tray": [{"id": 0}]},
|
|
|
+ ]
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "ams": {
|
|
|
+ "ams": ams_units,
|
|
|
+ "tray_now": "255",
|
|
|
+ "tray_exist_bits": "1000f",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+
|
|
|
+ # info=2003: bit8 = (2003>>8)&1 = 7&1 = 1 → extruder = 1-1 = 0 (right)
|
|
|
+ # info=2104: bit8 = (2104>>8)&1 = 8&1 = 0 → extruder = 1-0 = 1 (left)
|
|
|
+ assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
|
|
|
+
|
|
|
+ def test_dual_nozzle_detection_before_ams_in_same_message(self, mqtt_client):
|
|
|
+ """Dual-nozzle detection at line 538 happens before _handle_ams_data() at line 549.
|
|
|
+
|
|
|
+ If both arrive in the same message, tray_now disambiguation already uses dual-nozzle logic.
|
|
|
+ """
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "device": {
|
|
|
+ "extruder": {
|
|
|
+ "info": [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ],
|
|
|
+ "state": 0x0001,
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "ams": {
|
|
|
+ "ams": [
|
|
|
+ {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
|
|
|
+ ],
|
|
|
+ "tray_now": "2",
|
|
|
+ "tray_exist_bits": "f",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+
|
|
|
+ # Dual-nozzle was detected; AMS 0 on right extruder (active by default);
|
|
|
+ # snow is 0xFF00FF (unloaded), so falls through to ams_extruder_map fallback.
|
|
|
+ # Single AMS on extruder 0 → global_id = 0*4+2 = 2
|
|
|
+ assert mqtt_client._is_dual_nozzle is True
|
|
|
+ assert mqtt_client.state.tray_now == 2
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# Shared H2D fixture for classes 4-8
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class _H2DFixtureMixin:
|
|
|
+ """Mixin providing a pre-configured H2D Pro client."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ return BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_H2D",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def h2d_client(self, mqtt_client):
|
|
|
+ """Pre-configure as H2D Pro: dual-nozzle + ams_extruder_map."""
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "device": {
|
|
|
+ "extruder": {
|
|
|
+ "info": [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ],
|
|
|
+ "state": 0x0001, # right extruder active
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "ams": {
|
|
|
+ "ams": [
|
|
|
+ {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
|
|
|
+ {"id": 128, "info": 2104, "tray": [{"id": 0}]},
|
|
|
+ ],
|
|
|
+ "tray_now": "255",
|
|
|
+ "tray_exist_bits": "1000f",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+ assert mqtt_client._is_dual_nozzle is True
|
|
|
+ assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
|
|
|
+ return mqtt_client
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 4. H2D Snow field disambiguation
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DSnow(_H2DFixtureMixin):
|
|
|
+ """Snow field disambiguation (primary path)."""
|
|
|
+
|
|
|
+ def test_snow_disambiguates_ams0_slot(self, h2d_client):
|
|
|
+ """snow ext[0]=AMS 0 slot 2, tray_now='2' → global 2."""
|
|
|
+ # Send snow update FIRST (snow is parsed AFTER tray_now in the same message,
|
|
|
+ # so we need it in a prior message).
|
|
|
+ snow_val = 0 << 8 | 2 # AMS 0 slot 2 = raw 2
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": snow_val},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow.get(0) == 2
|
|
|
+
|
|
|
+ # Now send tray_now=2
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+ def test_snow_disambiguates_ams_ht_to_128(self, h2d_client):
|
|
|
+ """snow ext[1]=AMS HT (128), left active, tray_now='0' → global 128."""
|
|
|
+ # Snow: extruder 1 → AMS 128 slot 0
|
|
|
+ snow_val = 128 << 8 | 0 # = 32768
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": snow_val},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow.get(1) == 128
|
|
|
+
|
|
|
+ # Switch to left extruder
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ assert h2d_client.state.active_extruder == 1
|
|
|
+
|
|
|
+ # tray_now="0" with left extruder active, snow says AMS HT (128)
|
|
|
+ # AMS HT snow_slot = 0 (single slot), parsed_tray_now = 0 → match
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+
|
|
|
+ def test_snow_updates_h2d_extruder_snow_state(self, h2d_client):
|
|
|
+ """Verify state.h2d_extruder_snow dict is populated correctly."""
|
|
|
+ snow_ext0 = 1 << 8 | 3 # AMS 1 slot 3 → global 7
|
|
|
+ snow_ext1 = 0 << 8 | 0 # AMS 0 slot 0 → global 0
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": snow_ext0},
|
|
|
+ {"id": 1, "snow": snow_ext1},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow[0] == 7
|
|
|
+ assert h2d_client.state.h2d_extruder_snow[1] == 0
|
|
|
+
|
|
|
+ def test_snow_unloaded_value(self, h2d_client):
|
|
|
+ """snow=0xFFFF (ams_id=255, slot=255) → 255 (unloaded)."""
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFFFF},
|
|
|
+ {"id": 1, "snow": 0xFFFF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow[0] == 255
|
|
|
+ assert h2d_client.state.h2d_extruder_snow[1] == 255
|
|
|
+
|
|
|
+ def test_snow_initial_sentinel_not_stored(self, h2d_client):
|
|
|
+ """snow=0xFF00FF (firmware initial sentinel) is not parsed into h2d_extruder_snow."""
|
|
|
+ # 0xFF00FF has ams_id=0xFF00=65280 which doesn't match any branch
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ # Snow dict should remain empty (no matching branch)
|
|
|
+ assert h2d_client.state.h2d_extruder_snow == {}
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 5. H2D Pending target disambiguation
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DPendingTarget(_H2DFixtureMixin):
|
|
|
+ """Pending target disambiguation (when Bambuddy initiates load)."""
|
|
|
+
|
|
|
+ def test_pending_target_matches_slot(self, h2d_client):
|
|
|
+ """pending=5, tray_now='1' (5%4=1 matches) → tray_now=5."""
|
|
|
+ h2d_client.state.pending_tray_target = 5
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 5
|
|
|
+ assert h2d_client.state.pending_tray_target is None # cleared
|
|
|
+
|
|
|
+ def test_pending_target_slot_mismatch(self, h2d_client):
|
|
|
+ """pending=5, tray_now='2' → uses raw slot, clears pending."""
|
|
|
+ h2d_client.state.pending_tray_target = 5
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ # Slot 2 != 5%4=1 → mismatch, uses raw slot 2
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+ assert h2d_client.state.pending_tray_target is None
|
|
|
+
|
|
|
+ def test_pending_target_takes_priority_over_snow(self, h2d_client):
|
|
|
+ """When both pending and snow are set, pending wins."""
|
|
|
+ # Set up snow for extruder 0 → AMS 0 slot 1 → global 1
|
|
|
+ snow_val = 0 << 8 | 1
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": snow_val},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow.get(0) == 1
|
|
|
+
|
|
|
+ # Set pending target to AMS 1 slot 1 (global 5)
|
|
|
+ h2d_client.state.pending_tray_target = 5
|
|
|
+ # tray_now="1" — matches pending (5%4=1), pending should win over snow
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 5
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 6. H2D ams_extruder_map fallback
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DFallback(_H2DFixtureMixin):
|
|
|
+ """ams_extruder_map fallback (no pending, no snow)."""
|
|
|
+
|
|
|
+ def test_single_ams_on_extruder_computes_global_id(self, h2d_client):
|
|
|
+ """AMS 0 on right extruder, tray_now='2' → 0*4+2=2."""
|
|
|
+ # h2d_client has snow=0xFF00FF (unloaded) by default, so snow path skips
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ # AMS 0 is the only AMS on extruder 0 (right, active by default)
|
|
|
+ # Fallback: single AMS → global = 0*4+2 = 2
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+ def test_multiple_ams_keeps_current_if_valid(self, h2d_client):
|
|
|
+ """Current tray matches slot → keeps it (multi-AMS on same extruder)."""
|
|
|
+ # Set up: two AMS units on the same extruder (right, ext 0)
|
|
|
+ h2d_client.state.ams_extruder_map = {"0": 0, "1": 0}
|
|
|
+ # Pre-set tray_now=5 (AMS 1 slot 1) — current_ams=1 which is in ams_on_extruder
|
|
|
+ h2d_client.state.tray_now = 5
|
|
|
+ # tray_now="1" → 5%4=1 matches → keep current=5
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 5
|
|
|
+
|
|
|
+ def test_no_ams_on_extruder_uses_raw_slot(self, h2d_client):
|
|
|
+ """No AMS mapped to the active extruder → raw slot as global ID."""
|
|
|
+ # All AMS on left extruder, but right is active
|
|
|
+ h2d_client.state.ams_extruder_map = {"0": 1, "128": 1}
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+ def test_single_ams_ht_on_extruder_returns_unit_id(self, h2d_client):
|
|
|
+ """AMS-HT 128 alone on left extruder, slot 0 → global ID 128 (not 512)."""
|
|
|
+ # Switch to left extruder (where AMS-HT 128 is mapped)
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ # Only AMS-HT 128 on left extruder; no snow available
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+
|
|
|
+ def test_single_ams_ht_ignores_nonzero_slot(self, h2d_client):
|
|
|
+ """AMS-HT has single slot; even if printer reports slot 1, global ID = unit ID."""
|
|
|
+ h2d_client.state.ams_extruder_map = {"129": 0}
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ # AMS-HT 129: global ID = 129, not 129*4+1=517
|
|
|
+ assert h2d_client.state.tray_now == 129
|
|
|
+
|
|
|
+ def test_multiple_ams_keeps_current_ams_ht(self, h2d_client):
|
|
|
+ """Current tray is AMS-HT 128, slot 0 reported → keeps 128."""
|
|
|
+ h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
|
|
|
+ h2d_client.state.tray_now = 128
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+
|
|
|
+ def test_multiple_ams_slot_nonzero_excludes_ams_ht(self, h2d_client):
|
|
|
+ """Slot > 0 eliminates AMS-HT candidates; single regular AMS left → resolves."""
|
|
|
+ # AMS 0 + AMS-HT 128 both on right extruder
|
|
|
+ h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
|
|
|
+ h2d_client.state.tray_now = 255 # no current match
|
|
|
+ # Slot 2 → can't be AMS-HT → only AMS 0 → global = 0*4+2 = 2
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+ def test_multiple_ams_slot_nonzero_narrows_to_single_ht_excluded(self, h2d_client):
|
|
|
+ """Two regular AMS + one AMS-HT, slot > 0 → AMS-HT excluded but still ambiguous."""
|
|
|
+ h2d_client.state.ams_extruder_map = {"0": 0, "1": 0, "128": 0}
|
|
|
+ h2d_client.state.tray_now = 255
|
|
|
+ # Slot 3 → excludes AMS-HT, but AMS 0 and AMS 1 both remain → ambiguous
|
|
|
+ h2d_client._process_message(_ams_payload(3))
|
|
|
+ assert h2d_client.state.tray_now == 3 # raw slot fallback
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 6b. H2D last_loaded_tray validation
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestLastLoadedTrayValidation(_H2DFixtureMixin):
|
|
|
+ """last_loaded_tray only stores physically valid tray IDs."""
|
|
|
+
|
|
|
+ def test_regular_ams_tray_stored(self, h2d_client):
|
|
|
+ """Valid regular AMS tray (0-15) → stored in last_loaded_tray."""
|
|
|
+ h2d_client.state.tray_now = 7
|
|
|
+ # Trigger tray_now processing via AMS message
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 1 << 8 | 3}, # AMS 1 slot 3 → global 7
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ h2d_client._process_message(_ams_payload(3))
|
|
|
+ assert h2d_client.state.tray_now == 7
|
|
|
+ assert h2d_client.state.last_loaded_tray == 7
|
|
|
+
|
|
|
+ def test_ams_ht_tray_stored(self, h2d_client):
|
|
|
+ """Valid AMS-HT tray (128-135) → stored in last_loaded_tray."""
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+ assert h2d_client.state.last_loaded_tray == 128
|
|
|
+
|
|
|
+ def test_unloaded_not_stored(self, h2d_client):
|
|
|
+ """tray_now=255 (unloaded) → last_loaded_tray unchanged."""
|
|
|
+ h2d_client.state.last_loaded_tray = 5
|
|
|
+ h2d_client._process_message(_ams_payload(255))
|
|
|
+ assert h2d_client.state.tray_now == 255
|
|
|
+ assert h2d_client.state.last_loaded_tray == 5
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 7. H2D Active extruder switching
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DActiveExtruder(_H2DFixtureMixin):
|
|
|
+ """Active extruder switching via device.extruder.state bit 8."""
|
|
|
+
|
|
|
+ def test_active_extruder_right_by_default(self, h2d_client):
|
|
|
+ """Initial state.active_extruder == 0 (right)."""
|
|
|
+ assert h2d_client.state.active_extruder == 0
|
|
|
+
|
|
|
+ def test_extruder_state_bit8_switches_to_left(self, h2d_client):
|
|
|
+ """state=0x100 → active_extruder=1 (left)."""
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ assert h2d_client.state.active_extruder == 1
|
|
|
+
|
|
|
+ def test_extruder_state_bit8_switches_back_to_right(self, h2d_client):
|
|
|
+ """Cycle 0 → 1 → 0."""
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ assert h2d_client.state.active_extruder == 1
|
|
|
+
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0001))
|
|
|
+ assert h2d_client.state.active_extruder == 0
|
|
|
+
|
|
|
+ def test_extruder_switch_changes_tray_disambiguation(self, h2d_client):
|
|
|
+ """Snow on both extruders; switching active changes which snow is used."""
|
|
|
+ # Snow: ext 0 → AMS 0 slot 1 (global 1), ext 1 → AMS 128 slot 0 (global 128)
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 1}, # AMS 0 slot 1 → global 1
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0}, # AMS HT → global 128
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ # Right active (default) — tray_now="1" → snow ext[0] says global 1
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 1
|
|
|
+
|
|
|
+ # Switch to left
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+
|
|
|
+ # Left active — tray_now="0" → snow ext[1] says AMS HT (128), slot 0 matches
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 8. H2D Full multi-message sequences
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DFullSequence(_H2DFixtureMixin):
|
|
|
+ """Multi-message sequences simulating real H2D Pro prints."""
|
|
|
+
|
|
|
+ def test_h2d_right_nozzle_ams0_lifecycle(self, h2d_client):
|
|
|
+ """Setup → load AMS 0 slot 1 → verify tray_now=1."""
|
|
|
+ # Snow update: extruder 0 loading AMS 0 slot 1
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 1},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ # Printer reports tray_now="1"
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 1
|
|
|
+ assert h2d_client.state.last_loaded_tray == 1
|
|
|
+
|
|
|
+ def test_h2d_left_nozzle_ams_ht_lifecycle(self, h2d_client):
|
|
|
+ """Setup → switch left → load AMS HT → verify tray_now=128."""
|
|
|
+ # Switch to left extruder
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+
|
|
|
+ # Snow: ext 1 → AMS HT slot 0
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ # Printer reports tray_now="0" (AMS HT single slot)
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+ assert h2d_client.state.last_loaded_tray == 128
|
|
|
+
|
|
|
+ def test_h2d_multi_color_alternating_nozzles(self, h2d_client):
|
|
|
+ """Multi-color print alternating between right and left nozzles.
|
|
|
+
|
|
|
+ Sequence:
|
|
|
+ 1. Right loads AMS 0 slot 0 (tray=0)
|
|
|
+ 2. Switch left, load AMS HT (tray=128)
|
|
|
+ 3. Switch right, snow updates, load AMS 0 slot 2 (tray=2)
|
|
|
+ 4. Unload (255)
|
|
|
+ """
|
|
|
+ # Step 1: Right extruder loads AMS 0 slot 0
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 0},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 0
|
|
|
+
|
|
|
+ # Step 2: Switch to left, load AMS HT
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 0},
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+
|
|
|
+ # Step 3: Switch back to right, load AMS 0 slot 2
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0001))
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 2},
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+ # Step 4: Unload
|
|
|
+ h2d_client._process_message(_ams_payload(255))
|
|
|
+ assert h2d_client.state.tray_now == 255
|
|
|
+ assert h2d_client.state.last_loaded_tray == 2
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayChangeLog:
|
|
|
+ """Tests for tray_change_log tracking during prints (mid-print tray switch)."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ """Create a BambuMQTTClient instance for testing."""
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ client = BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TRAYLOG1",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+ return client
|
|
|
+
|
|
|
+ def test_tray_change_log_defaults_empty(self, mqtt_client):
|
|
|
+ """tray_change_log starts as an empty list."""
|
|
|
+ assert mqtt_client.state.tray_change_log == []
|
|
|
+
|
|
|
+ def test_tray_change_log_seeded_on_print_start(self, mqtt_client):
|
|
|
+ """Print start clears log and seeds with initial tray at layer 0."""
|
|
|
+ mqtt_client.state.tray_now = 2
|
|
|
+ mqtt_client.state.last_loaded_tray = 2
|
|
|
+ mqtt_client._previous_gcode_state = "IDLE"
|
|
|
+
|
|
|
+ # Transition to RUNNING via _process_message
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "gcode_file": "test.3mf",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ assert mqtt_client.state.tray_change_log == [(2, 0)]
|
|
|
+
|
|
|
+ def test_tray_change_log_cleared_on_new_print(self, mqtt_client):
|
|
|
+ """Old log entries are cleared when a new print starts."""
|
|
|
+ mqtt_client.state.tray_change_log = [(5, 0), (3, 100)]
|
|
|
+ mqtt_client.state.tray_now = 1
|
|
|
+ mqtt_client.state.last_loaded_tray = 1
|
|
|
+ mqtt_client._previous_gcode_state = "IDLE"
|
|
|
+
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "gcode_file": "new.3mf",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ assert mqtt_client.state.tray_change_log == [(1, 0)]
|
|
|
+
|
|
|
+ def test_tray_change_recorded_during_running(self, mqtt_client):
|
|
|
+ """Tray change while RUNNING is appended to the log."""
|
|
|
+ mqtt_client.state.state = "RUNNING"
|
|
|
+ mqtt_client.state.layer_num = 50
|
|
|
+ mqtt_client.state.last_loaded_tray = 0
|
|
|
+ mqtt_client.state.tray_change_log = [(0, 0)]
|
|
|
+
|
|
|
+ # Simulate tray_now update via AMS data
|
|
|
+ mqtt_client.state.tray_now = 1
|
|
|
+ # Trigger the tracking code path
|
|
|
+ tn = mqtt_client.state.tray_now
|
|
|
+ if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
|
|
|
+ mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
|
|
|
+ mqtt_client.state.last_loaded_tray = tn
|
|
|
+
|
|
|
+ assert mqtt_client.state.tray_change_log == [(0, 0), (1, 50)]
|
|
|
+
|
|
|
+ def test_tray_change_not_recorded_when_idle(self, mqtt_client):
|
|
|
+ """Tray changes while IDLE are NOT logged."""
|
|
|
+ mqtt_client.state.state = "IDLE"
|
|
|
+ mqtt_client.state.layer_num = 0
|
|
|
+ mqtt_client.state.last_loaded_tray = 0
|
|
|
+ mqtt_client.state.tray_change_log = []
|
|
|
+
|
|
|
+ mqtt_client.state.tray_now = 3
|
|
|
+ tn = mqtt_client.state.tray_now
|
|
|
+ if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
|
|
|
+ mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
|
|
|
+ mqtt_client.state.last_loaded_tray = tn
|
|
|
+
|
|
|
+ assert mqtt_client.state.tray_change_log == []
|
|
|
+
|
|
|
+ def test_tray_change_recorded_during_pause(self, mqtt_client):
|
|
|
+ """Tray change while PAUSE is also logged (AMS can swap during pause)."""
|
|
|
+ mqtt_client.state.state = "PAUSE"
|
|
|
+ mqtt_client.state.layer_num = 75
|
|
|
+ mqtt_client.state.last_loaded_tray = 2
|
|
|
+ mqtt_client.state.tray_change_log = [(2, 0)]
|
|
|
+
|
|
|
+ mqtt_client.state.tray_now = 5
|
|
|
+ tn = mqtt_client.state.tray_now
|
|
|
+ if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
|
|
|
+ mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
|
|
|
+ mqtt_client.state.last_loaded_tray = tn
|
|
|
+
|
|
|
+ assert mqtt_client.state.tray_change_log == [(2, 0), (5, 75)]
|
|
|
+
|
|
|
+ def test_same_tray_not_logged_twice(self, mqtt_client):
|
|
|
+ """Same tray value doesn't create duplicate log entries."""
|
|
|
+ mqtt_client.state.state = "RUNNING"
|
|
|
+ mqtt_client.state.layer_num = 30
|
|
|
+ mqtt_client.state.last_loaded_tray = 2
|
|
|
+ mqtt_client.state.tray_change_log = [(2, 0)]
|
|
|
+
|
|
|
+ # Same tray again
|
|
|
+ mqtt_client.state.tray_now = 2
|
|
|
+ tn = mqtt_client.state.tray_now
|
|
|
+ if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
|
|
|
+ mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
|
|
|
+ mqtt_client.state.last_loaded_tray = tn
|
|
|
+
|
|
|
+ assert mqtt_client.state.tray_change_log == [(2, 0)]
|
|
|
+
|
|
|
+ def test_multiple_tray_changes(self, mqtt_client):
|
|
|
+ """Multiple tray changes create a full history."""
|
|
|
+ mqtt_client.state.state = "RUNNING"
|
|
|
+ mqtt_client.state.last_loaded_tray = 0
|
|
|
+ mqtt_client.state.tray_change_log = [(0, 0)]
|
|
|
+
|
|
|
+ changes = [(1, 50), (3, 120), (0, 200)]
|
|
|
+ for tray, layer in changes:
|
|
|
+ mqtt_client.state.tray_now = tray
|
|
|
+ mqtt_client.state.layer_num = layer
|
|
|
+ tn = mqtt_client.state.tray_now
|
|
|
+ if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
|
|
|
+ mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
|
|
|
+ mqtt_client.state.last_loaded_tray = tn
|
|
|
+
|
|
|
+ assert mqtt_client.state.tray_change_log == [(0, 0), (1, 50), (3, 120), (0, 200)]
|
|
|
+
|
|
|
+
|
|
|
+class TestDeveloperModeDetection:
|
|
|
+ """Tests for developer LAN mode detection from MQTT 'fun' field."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ """Create a BambuMQTTClient instance for testing."""
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ client = BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST123",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+ return client
|
|
|
+
|
|
|
+ def test_developer_mode_initially_none(self, mqtt_client):
|
|
|
+ """Verify developer_mode starts as None (unknown)."""
|
|
|
+ assert mqtt_client.state.developer_mode is None
|
|
|
+
|
|
|
+ def test_developer_mode_on_when_bit_clear(self, mqtt_client):
|
|
|
+ """Verify developer_mode is True when bit 0x20000000 is clear."""
|
|
|
+ # Bit 29 clear in lower 32 bits = developer mode ON
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "IDLE",
|
|
|
+ "fun": "1C8187FF9CFF",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+ assert mqtt_client.state.developer_mode is True
|
|
|
+
|
|
|
+ def test_developer_mode_off_when_bit_set(self, mqtt_client):
|
|
|
+ """Verify developer_mode is False when bit 0x20000000 is set."""
|
|
|
+ # Bit 29 set in lower 32 bits = developer mode OFF (encryption required)
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "IDLE",
|
|
|
+ "fun": "1C81A7FF9CFF",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+ assert mqtt_client.state.developer_mode is False
|
|
|
+
|
|
|
+ def test_developer_mode_exact_bit_check(self, mqtt_client):
|
|
|
+ """Verify only bit 0x20000000 matters, not other bits."""
|
|
|
+ # 0x20000000 in hex = bit 29. Set ONLY that bit.
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "IDLE",
|
|
|
+ "fun": "000020000000",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+ assert mqtt_client.state.developer_mode is False
|
|
|
+
|
|
|
+ # All zeros = all bits clear = developer mode ON
|
|
|
+ payload["print"]["fun"] = "000000000000"
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+ assert mqtt_client.state.developer_mode is True
|
|
|
+
|
|
|
+ def test_developer_mode_invalid_fun_ignored(self, mqtt_client):
|
|
|
+ """Verify invalid fun values don't crash or change state."""
|
|
|
+ mqtt_client.state.developer_mode = True
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "IDLE",
|
|
|
+ "fun": "not_a_hex_value",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+ # Should remain unchanged
|
|
|
+ assert mqtt_client.state.developer_mode is True
|
|
|
+
|
|
|
+ def test_developer_mode_missing_fun_preserves_state(self, mqtt_client):
|
|
|
+ """Verify messages without fun field don't reset developer_mode."""
|
|
|
+ mqtt_client.state.developer_mode = False
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "mc_percent": 50,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+ assert mqtt_client.state.developer_mode is False
|
|
|
+
|
|
|
+ def test_developer_mode_persists_across_messages(self, mqtt_client):
|
|
|
+ """Verify developer_mode set by fun persists across messages without fun."""
|
|
|
+ # First message sets developer_mode
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "IDLE",
|
|
|
+ "fun": "3EC1AFFF9CFF",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+ assert mqtt_client.state.developer_mode is False
|
|
|
+
|
|
|
+ # Subsequent messages without fun don't change it
|
|
|
+ for _ in range(3):
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "mc_percent": 50,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+ assert mqtt_client.state.developer_mode is False
|