|
|
@@ -858,3 +858,922 @@ class TestNozzleRackData:
|
|
|
assert mqtt_client.state.nozzles[0].nozzle_diameter == "0.4"
|
|
|
assert mqtt_client.state.nozzles[1].nozzle_type == "HH01"
|
|
|
assert mqtt_client.state.nozzles[1].nozzle_diameter == "0.6"
|
|
|
+
|
|
|
+
|
|
|
+class TestRequestTopicFailSafe:
|
|
|
+ """Tests for graceful degradation when broker rejects request topic subscription."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ client = BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST123",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+ return client
|
|
|
+
|
|
|
+ def test_request_topic_supported_by_default(self, mqtt_client):
|
|
|
+ """Request topic subscription is attempted by default."""
|
|
|
+ assert mqtt_client._request_topic_supported is True
|
|
|
+ assert mqtt_client._request_topic_confirmed is False
|
|
|
+
|
|
|
+ def test_on_subscribe_confirms_success(self, mqtt_client):
|
|
|
+ """Successful SUBACK marks request topic as confirmed."""
|
|
|
+ from paho.mqtt.reasoncodes import ReasonCode
|
|
|
+
|
|
|
+ mqtt_client._request_topic_sub_mid = 42
|
|
|
+ rc = ReasonCode(9, identifier=0) # SUBACK packetType=9, QoS 0 = success
|
|
|
+ mqtt_client._on_subscribe(None, None, 42, [rc], None)
|
|
|
+
|
|
|
+ assert mqtt_client._request_topic_confirmed is True
|
|
|
+ assert mqtt_client._request_topic_supported is True
|
|
|
+ assert mqtt_client._request_topic_sub_mid is None
|
|
|
+ assert mqtt_client._request_topic_sub_time == 0.0
|
|
|
+
|
|
|
+ def test_on_subscribe_detects_rejection(self, mqtt_client):
|
|
|
+ """SUBACK with failure code disables request topic."""
|
|
|
+ from paho.mqtt.reasoncodes import ReasonCode
|
|
|
+
|
|
|
+ mqtt_client._request_topic_sub_mid = 42
|
|
|
+ rc = ReasonCode(9, identifier=0x80) # SUBACK packetType=9, 0x80 = failure
|
|
|
+ mqtt_client._on_subscribe(None, None, 42, [rc], None)
|
|
|
+
|
|
|
+ assert mqtt_client._request_topic_supported is False
|
|
|
+ assert mqtt_client._request_topic_confirmed is False
|
|
|
+
|
|
|
+ def test_on_subscribe_ignores_other_mids(self, mqtt_client):
|
|
|
+ """SUBACK for other subscriptions (e.g. report topic) is ignored."""
|
|
|
+ from paho.mqtt.reasoncodes import ReasonCode
|
|
|
+
|
|
|
+ mqtt_client._request_topic_sub_mid = 42
|
|
|
+ rc = ReasonCode(9, identifier=0x80)
|
|
|
+ mqtt_client._on_subscribe(None, None, 99, [rc], None)
|
|
|
+
|
|
|
+ # Not affected — mid doesn't match
|
|
|
+ assert mqtt_client._request_topic_supported is True
|
|
|
+
|
|
|
+ def test_disconnect_after_subscription_disables_topic(self, mqtt_client):
|
|
|
+ """Disconnect within 10s of subscription attempt disables request topic."""
|
|
|
+ import time
|
|
|
+
|
|
|
+ mqtt_client._request_topic_sub_time = time.time()
|
|
|
+ mqtt_client._request_topic_confirmed = False
|
|
|
+ mqtt_client._last_message_time = 0.0
|
|
|
+
|
|
|
+ mqtt_client._on_disconnect(None, None)
|
|
|
+
|
|
|
+ assert mqtt_client._request_topic_supported is False
|
|
|
+ assert mqtt_client._request_topic_sub_time == 0.0
|
|
|
+
|
|
|
+ def test_disconnect_after_confirmation_does_not_disable(self, mqtt_client):
|
|
|
+ """Disconnect after SUBACK confirmation keeps request topic enabled."""
|
|
|
+ import time
|
|
|
+
|
|
|
+ mqtt_client._request_topic_sub_time = time.time()
|
|
|
+ mqtt_client._request_topic_confirmed = True
|
|
|
+ mqtt_client._last_message_time = 0.0
|
|
|
+
|
|
|
+ mqtt_client._on_disconnect(None, None)
|
|
|
+
|
|
|
+ assert mqtt_client._request_topic_supported is True
|
|
|
+
|
|
|
+ def test_late_disconnect_does_not_disable(self, mqtt_client):
|
|
|
+ """Disconnect long after subscription (>10s) doesn't blame request topic."""
|
|
|
+ import time
|
|
|
+
|
|
|
+ mqtt_client._request_topic_sub_time = time.time() - 30.0
|
|
|
+ mqtt_client._request_topic_confirmed = False
|
|
|
+ mqtt_client._last_message_time = 0.0
|
|
|
+
|
|
|
+ mqtt_client._on_disconnect(None, None)
|
|
|
+
|
|
|
+ assert mqtt_client._request_topic_supported is True
|
|
|
+
|
|
|
+ def test_on_connect_skips_request_topic_when_unsupported(self, mqtt_client):
|
|
|
+ """After marking unsupported, reconnect skips request topic subscription."""
|
|
|
+ mqtt_client._request_topic_supported = False
|
|
|
+
|
|
|
+ subscribe_calls = []
|
|
|
+ mock_client = type(
|
|
|
+ "MockClient",
|
|
|
+ (),
|
|
|
+ {
|
|
|
+ "subscribe": lambda self, topic: subscribe_calls.append(topic) or (0, 1),
|
|
|
+ },
|
|
|
+ )()
|
|
|
+
|
|
|
+ mqtt_client._on_connect(mock_client, None, None, 0)
|
|
|
+
|
|
|
+ # Only report topic subscribed, not request topic
|
|
|
+ assert len(subscribe_calls) == 1
|
|
|
+ assert subscribe_calls[0] == mqtt_client.topic_subscribe
|
|
|
+
|
|
|
+
|
|
|
+class TestRequestTopicAmsMapping:
|
|
|
+ """Tests for capturing ams_mapping from the MQTT request topic."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ """Create a BambuMQTTClient instance for testing."""
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ client = BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST123",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+ return client
|
|
|
+
|
|
|
+ def test_captured_ams_mapping_initializes_to_none(self, mqtt_client):
|
|
|
+ """Verify _captured_ams_mapping starts as None."""
|
|
|
+ assert mqtt_client._captured_ams_mapping is None
|
|
|
+
|
|
|
+ def test_handle_request_message_captures_ams_mapping(self, mqtt_client):
|
|
|
+ """project_file command with ams_mapping stores the mapping."""
|
|
|
+ data = {
|
|
|
+ "print": {
|
|
|
+ "command": "project_file",
|
|
|
+ "ams_mapping": [0, 4, -1, -1],
|
|
|
+ "url": "ftp://192.168.1.100/test.3mf",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._handle_request_message(data)
|
|
|
+ assert mqtt_client._captured_ams_mapping == [0, 4, -1, -1]
|
|
|
+
|
|
|
+ def test_handle_request_message_ignores_non_print_commands(self, mqtt_client):
|
|
|
+ """Non-project_file commands don't store ams_mapping."""
|
|
|
+ data = {
|
|
|
+ "print": {
|
|
|
+ "command": "pause",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._handle_request_message(data)
|
|
|
+ assert mqtt_client._captured_ams_mapping is None
|
|
|
+
|
|
|
+ def test_handle_request_message_ignores_missing_ams_mapping(self, mqtt_client):
|
|
|
+ """project_file command without ams_mapping doesn't store anything."""
|
|
|
+ data = {
|
|
|
+ "print": {
|
|
|
+ "command": "project_file",
|
|
|
+ "url": "ftp://192.168.1.100/test.3mf",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._handle_request_message(data)
|
|
|
+ assert mqtt_client._captured_ams_mapping is None
|
|
|
+
|
|
|
+ def test_handle_request_message_ignores_non_dict_print(self, mqtt_client):
|
|
|
+ """Non-dict print value is safely ignored."""
|
|
|
+ data = {"print": "not_a_dict"}
|
|
|
+ mqtt_client._handle_request_message(data)
|
|
|
+ assert mqtt_client._captured_ams_mapping is None
|
|
|
+
|
|
|
+ def test_handle_request_message_ignores_missing_print(self, mqtt_client):
|
|
|
+ """Message without print key is safely ignored."""
|
|
|
+ data = {"pushing": {"command": "pushall"}}
|
|
|
+ mqtt_client._handle_request_message(data)
|
|
|
+ assert mqtt_client._captured_ams_mapping is None
|
|
|
+
|
|
|
+ def test_captured_mapping_overwrites_previous(self, mqtt_client):
|
|
|
+ """A new print command overwrites a previously captured mapping."""
|
|
|
+ mqtt_client._captured_ams_mapping = [0, -1, -1, -1]
|
|
|
+ data = {
|
|
|
+ "print": {
|
|
|
+ "command": "project_file",
|
|
|
+ "ams_mapping": [4, 8, -1, -1],
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._handle_request_message(data)
|
|
|
+ assert mqtt_client._captured_ams_mapping == [4, 8, -1, -1]
|
|
|
+
|
|
|
+ def test_print_start_callback_includes_ams_mapping(self, mqtt_client):
|
|
|
+ """on_print_start callback data includes captured ams_mapping."""
|
|
|
+ start_data = {}
|
|
|
+
|
|
|
+ def on_start(data):
|
|
|
+ start_data.update(data)
|
|
|
+
|
|
|
+ mqtt_client.on_print_start = on_start
|
|
|
+ mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
|
|
|
+
|
|
|
+ # Trigger print start
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "gcode_file": "/data/Metadata/test.gcode",
|
|
|
+ "subtask_name": "Test",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ assert start_data.get("ams_mapping") == [0, 4, -1, -1]
|
|
|
+
|
|
|
+ def test_print_start_callback_ams_mapping_none_when_not_captured(self, mqtt_client):
|
|
|
+ """on_print_start callback has ams_mapping=None when no mapping captured."""
|
|
|
+ start_data = {}
|
|
|
+
|
|
|
+ def on_start(data):
|
|
|
+ start_data.update(data)
|
|
|
+
|
|
|
+ mqtt_client.on_print_start = on_start
|
|
|
+
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "gcode_file": "/data/Metadata/test.gcode",
|
|
|
+ "subtask_name": "Test",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ assert "ams_mapping" in start_data
|
|
|
+ assert start_data["ams_mapping"] is None
|
|
|
+
|
|
|
+ def test_print_complete_callback_includes_ams_mapping(self, mqtt_client):
|
|
|
+ """on_print_complete callback data includes captured ams_mapping."""
|
|
|
+ complete_data = {}
|
|
|
+
|
|
|
+ def on_complete(data):
|
|
|
+ complete_data.update(data)
|
|
|
+
|
|
|
+ mqtt_client.on_print_start = lambda d: None
|
|
|
+ mqtt_client.on_print_complete = on_complete
|
|
|
+ mqtt_client._captured_ams_mapping = [0, 9, -1, -1]
|
|
|
+
|
|
|
+ # Start print
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "gcode_file": "/data/Metadata/test.gcode",
|
|
|
+ "subtask_name": "Test",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ # Complete print
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "FINISH",
|
|
|
+ "gcode_file": "/data/Metadata/test.gcode",
|
|
|
+ "subtask_name": "Test",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ assert complete_data.get("ams_mapping") == [0, 9, -1, -1]
|
|
|
+
|
|
|
+ def test_captured_mapping_cleared_after_print_complete(self, mqtt_client):
|
|
|
+ """_captured_ams_mapping is reset to None after print completion."""
|
|
|
+ mqtt_client.on_print_start = lambda d: None
|
|
|
+ mqtt_client.on_print_complete = lambda d: None
|
|
|
+ mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
|
|
|
+
|
|
|
+ # Start print
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "gcode_file": "/data/Metadata/test.gcode",
|
|
|
+ "subtask_name": "Test",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ # Complete print
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "FINISH",
|
|
|
+ "gcode_file": "/data/Metadata/test.gcode",
|
|
|
+ "subtask_name": "Test",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ assert mqtt_client._captured_ams_mapping is None
|
|
|
+
|
|
|
+ def test_full_flow_capture_and_deliver(self, mqtt_client):
|
|
|
+ """Full flow: slicer sends print command → MQTT captures mapping → completion delivers it."""
|
|
|
+ complete_data = {}
|
|
|
+
|
|
|
+ def on_complete(data):
|
|
|
+ complete_data.update(data)
|
|
|
+
|
|
|
+ mqtt_client.on_print_start = lambda d: None
|
|
|
+ mqtt_client.on_print_complete = on_complete
|
|
|
+
|
|
|
+ # 1. Slicer sends print command (captured from request topic)
|
|
|
+ mqtt_client._handle_request_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "command": "project_file",
|
|
|
+ "ams_mapping": [4, 9, -1, -1],
|
|
|
+ "url": "ftp://192.168.1.100/model.3mf",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+ assert mqtt_client._captured_ams_mapping == [4, 9, -1, -1]
|
|
|
+
|
|
|
+ # 2. Printer reports RUNNING
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "RUNNING",
|
|
|
+ "gcode_file": "/data/Metadata/model.gcode",
|
|
|
+ "subtask_name": "Model",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ # 3. Printer reports FINISH
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "gcode_state": "FINISH",
|
|
|
+ "gcode_file": "/data/Metadata/model.gcode",
|
|
|
+ "subtask_name": "Model",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ assert complete_data["ams_mapping"] == [4, 9, -1, -1]
|
|
|
+ assert complete_data["status"] == "completed"
|
|
|
+ # Mapping cleared after completion
|
|
|
+ assert mqtt_client._captured_ams_mapping is None
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# tray_now disambiguation helpers
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+def _ams_payload(tray_now, ams_units=None, tray_exist_bits=None):
|
|
|
+ """Build minimal print.ams payload for tray_now disambiguation tests."""
|
|
|
+ ams = {"tray_now": str(tray_now)}
|
|
|
+ if ams_units is not None:
|
|
|
+ ams["ams"] = ams_units
|
|
|
+ if tray_exist_bits is not None:
|
|
|
+ ams["tray_exist_bits"] = tray_exist_bits
|
|
|
+ return {"print": {"ams": ams}}
|
|
|
+
|
|
|
+
|
|
|
+def _extruder_info_payload(extruders):
|
|
|
+ """Build device.extruder.info payload (dual-nozzle detection + snow).
|
|
|
+
|
|
|
+ Each entry in *extruders* is a dict with at least ``id`` and ``snow``.
|
|
|
+ """
|
|
|
+ return {
|
|
|
+ "print": {
|
|
|
+ "device": {
|
|
|
+ "extruder": {
|
|
|
+ "info": extruders,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def _extruder_state_payload(state_val):
|
|
|
+ """Build device.extruder.state payload (active extruder via bit 8)."""
|
|
|
+ return {
|
|
|
+ "print": {
|
|
|
+ "device": {
|
|
|
+ "extruder": {
|
|
|
+ "state": state_val,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 1. Single-nozzle X1E — direct passthrough
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowSingleNozzleX1E:
|
|
|
+ """Single-nozzle, 1 AMS — tray_now is a direct passthrough."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ return BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_X1E",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_tray_now_direct_passthrough_slot_0_to_3(self, mqtt_client):
|
|
|
+ """Each tray_now 0-3 maps 1:1 on single-nozzle printers."""
|
|
|
+ for slot in range(4):
|
|
|
+ mqtt_client._process_message(_ams_payload(slot))
|
|
|
+ assert mqtt_client.state.tray_now == slot
|
|
|
+
|
|
|
+ def test_tray_now_255_means_unloaded(self, mqtt_client):
|
|
|
+ """tray_now=255 means no filament loaded."""
|
|
|
+ mqtt_client._process_message(_ams_payload(255))
|
|
|
+ assert mqtt_client.state.tray_now == 255
|
|
|
+
|
|
|
+ def test_single_extruder_does_not_trigger_dual_nozzle(self, mqtt_client):
|
|
|
+ """device.extruder.info with 1 entry must NOT set _is_dual_nozzle."""
|
|
|
+ mqtt_client._process_message(_extruder_info_payload([{"id": 0, "snow": 0xFF00FF}]))
|
|
|
+ assert mqtt_client._is_dual_nozzle is False
|
|
|
+
|
|
|
+ def test_last_loaded_tray_survives_unload(self, mqtt_client):
|
|
|
+ """Load tray 2, unload → last_loaded_tray stays 2."""
|
|
|
+ mqtt_client._process_message(_ams_payload(2))
|
|
|
+ assert mqtt_client.state.last_loaded_tray == 2
|
|
|
+
|
|
|
+ mqtt_client._process_message(_ams_payload(255))
|
|
|
+ assert mqtt_client.state.tray_now == 255
|
|
|
+ assert mqtt_client.state.last_loaded_tray == 2
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 2. Single-nozzle P2S — multiple AMS, global IDs pass through
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowSingleNozzleP2S:
|
|
|
+ """Single-nozzle, 2 AMS — global IDs 4-7 for AMS 1 pass through directly."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ return BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_P2S",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_tray_now_ams1_global_ids_4_to_7(self, mqtt_client):
|
|
|
+ """tray_now 4-7 are global IDs for AMS 1 on single-nozzle printers."""
|
|
|
+ for global_id in range(4, 8):
|
|
|
+ mqtt_client._process_message(_ams_payload(global_id))
|
|
|
+ assert mqtt_client.state.tray_now == global_id
|
|
|
+
|
|
|
+ def test_tray_change_across_ams_units(self, mqtt_client):
|
|
|
+ """Switch from AMS 0 slot 1 → AMS 1 slot 2 (global 6)."""
|
|
|
+ mqtt_client._process_message(_ams_payload(1))
|
|
|
+ assert mqtt_client.state.tray_now == 1
|
|
|
+
|
|
|
+ mqtt_client._process_message(_ams_payload(6))
|
|
|
+ assert mqtt_client.state.tray_now == 6
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 3. H2D Pro — initial state detection
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DSetup:
|
|
|
+ """H2D Pro initial state detection."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ return BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_H2D",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_dual_nozzle_detected_from_extruder_info(self, mqtt_client):
|
|
|
+ """2 entries in device.extruder.info → _is_dual_nozzle=True."""
|
|
|
+ mqtt_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert mqtt_client._is_dual_nozzle is True
|
|
|
+
|
|
|
+ def test_ams_extruder_map_parsed_from_info_field(self, mqtt_client):
|
|
|
+ """AMS 0 info=2003 → right (ext 0), AMS 128 info=2104 → left (ext 1)."""
|
|
|
+ ams_units = [
|
|
|
+ {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
|
|
|
+ {"id": 128, "info": 2104, "tray": [{"id": 0}]},
|
|
|
+ ]
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "ams": {
|
|
|
+ "ams": ams_units,
|
|
|
+ "tray_now": "255",
|
|
|
+ "tray_exist_bits": "1000f",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+
|
|
|
+ # info=2003: bit8 = (2003>>8)&1 = 7&1 = 1 → extruder = 1-1 = 0 (right)
|
|
|
+ # info=2104: bit8 = (2104>>8)&1 = 8&1 = 0 → extruder = 1-0 = 1 (left)
|
|
|
+ assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
|
|
|
+
|
|
|
+ def test_dual_nozzle_detection_before_ams_in_same_message(self, mqtt_client):
|
|
|
+ """Dual-nozzle detection at line 538 happens before _handle_ams_data() at line 549.
|
|
|
+
|
|
|
+ If both arrive in the same message, tray_now disambiguation already uses dual-nozzle logic.
|
|
|
+ """
|
|
|
+ payload = {
|
|
|
+ "print": {
|
|
|
+ "device": {
|
|
|
+ "extruder": {
|
|
|
+ "info": [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ],
|
|
|
+ "state": 0x0001,
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "ams": {
|
|
|
+ "ams": [
|
|
|
+ {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
|
|
|
+ ],
|
|
|
+ "tray_now": "2",
|
|
|
+ "tray_exist_bits": "f",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqtt_client._process_message(payload)
|
|
|
+
|
|
|
+ # Dual-nozzle was detected; AMS 0 on right extruder (active by default);
|
|
|
+ # snow is 0xFF00FF (unloaded), so falls through to ams_extruder_map fallback.
|
|
|
+ # Single AMS on extruder 0 → global_id = 0*4+2 = 2
|
|
|
+ assert mqtt_client._is_dual_nozzle is True
|
|
|
+ assert mqtt_client.state.tray_now == 2
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# Shared H2D fixture for classes 4-8
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class _H2DFixtureMixin:
|
|
|
+ """Mixin providing a pre-configured H2D Pro client."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mqtt_client(self):
|
|
|
+ from backend.app.services.bambu_mqtt import BambuMQTTClient
|
|
|
+
|
|
|
+ return BambuMQTTClient(
|
|
|
+ ip_address="192.168.1.100",
|
|
|
+ serial_number="TEST_H2D",
|
|
|
+ access_code="12345678",
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def h2d_client(self, mqtt_client):
|
|
|
+ """Pre-configure as H2D Pro: dual-nozzle + ams_extruder_map."""
|
|
|
+ mqtt_client._process_message(
|
|
|
+ {
|
|
|
+ "print": {
|
|
|
+ "device": {
|
|
|
+ "extruder": {
|
|
|
+ "info": [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ],
|
|
|
+ "state": 0x0001, # right extruder active
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "ams": {
|
|
|
+ "ams": [
|
|
|
+ {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
|
|
|
+ {"id": 128, "info": 2104, "tray": [{"id": 0}]},
|
|
|
+ ],
|
|
|
+ "tray_now": "255",
|
|
|
+ "tray_exist_bits": "1000f",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+ assert mqtt_client._is_dual_nozzle is True
|
|
|
+ assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
|
|
|
+ return mqtt_client
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 4. H2D Snow field disambiguation
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DSnow(_H2DFixtureMixin):
|
|
|
+ """Snow field disambiguation (primary path)."""
|
|
|
+
|
|
|
+ def test_snow_disambiguates_ams0_slot(self, h2d_client):
|
|
|
+ """snow ext[0]=AMS 0 slot 2, tray_now='2' → global 2."""
|
|
|
+ # Send snow update FIRST (snow is parsed AFTER tray_now in the same message,
|
|
|
+ # so we need it in a prior message).
|
|
|
+ snow_val = 0 << 8 | 2 # AMS 0 slot 2 = raw 2
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": snow_val},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow.get(0) == 2
|
|
|
+
|
|
|
+ # Now send tray_now=2
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+ def test_snow_disambiguates_ams_ht_to_128(self, h2d_client):
|
|
|
+ """snow ext[1]=AMS HT (128), left active, tray_now='0' → global 128."""
|
|
|
+ # Snow: extruder 1 → AMS 128 slot 0
|
|
|
+ snow_val = 128 << 8 | 0 # = 32768
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": snow_val},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow.get(1) == 128
|
|
|
+
|
|
|
+ # Switch to left extruder
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ assert h2d_client.state.active_extruder == 1
|
|
|
+
|
|
|
+ # tray_now="0" with left extruder active, snow says AMS HT (128)
|
|
|
+ # AMS HT snow_slot = 0 (single slot), parsed_tray_now = 0 → match
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+
|
|
|
+ def test_snow_updates_h2d_extruder_snow_state(self, h2d_client):
|
|
|
+ """Verify state.h2d_extruder_snow dict is populated correctly."""
|
|
|
+ snow_ext0 = 1 << 8 | 3 # AMS 1 slot 3 → global 7
|
|
|
+ snow_ext1 = 0 << 8 | 0 # AMS 0 slot 0 → global 0
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": snow_ext0},
|
|
|
+ {"id": 1, "snow": snow_ext1},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow[0] == 7
|
|
|
+ assert h2d_client.state.h2d_extruder_snow[1] == 0
|
|
|
+
|
|
|
+ def test_snow_unloaded_value(self, h2d_client):
|
|
|
+ """snow=0xFFFF (ams_id=255, slot=255) → 255 (unloaded)."""
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFFFF},
|
|
|
+ {"id": 1, "snow": 0xFFFF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow[0] == 255
|
|
|
+ assert h2d_client.state.h2d_extruder_snow[1] == 255
|
|
|
+
|
|
|
+ def test_snow_initial_sentinel_not_stored(self, h2d_client):
|
|
|
+ """snow=0xFF00FF (firmware initial sentinel) is not parsed into h2d_extruder_snow."""
|
|
|
+ # 0xFF00FF has ams_id=0xFF00=65280 which doesn't match any branch
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ # Snow dict should remain empty (no matching branch)
|
|
|
+ assert h2d_client.state.h2d_extruder_snow == {}
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 5. H2D Pending target disambiguation
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DPendingTarget(_H2DFixtureMixin):
|
|
|
+ """Pending target disambiguation (when Bambuddy initiates load)."""
|
|
|
+
|
|
|
+ def test_pending_target_matches_slot(self, h2d_client):
|
|
|
+ """pending=5, tray_now='1' (5%4=1 matches) → tray_now=5."""
|
|
|
+ h2d_client.state.pending_tray_target = 5
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 5
|
|
|
+ assert h2d_client.state.pending_tray_target is None # cleared
|
|
|
+
|
|
|
+ def test_pending_target_slot_mismatch(self, h2d_client):
|
|
|
+ """pending=5, tray_now='2' → uses raw slot, clears pending."""
|
|
|
+ h2d_client.state.pending_tray_target = 5
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ # Slot 2 != 5%4=1 → mismatch, uses raw slot 2
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+ assert h2d_client.state.pending_tray_target is None
|
|
|
+
|
|
|
+ def test_pending_target_takes_priority_over_snow(self, h2d_client):
|
|
|
+ """When both pending and snow are set, pending wins."""
|
|
|
+ # Set up snow for extruder 0 → AMS 0 slot 1 → global 1
|
|
|
+ snow_val = 0 << 8 | 1
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": snow_val},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ assert h2d_client.state.h2d_extruder_snow.get(0) == 1
|
|
|
+
|
|
|
+ # Set pending target to AMS 1 slot 1 (global 5)
|
|
|
+ h2d_client.state.pending_tray_target = 5
|
|
|
+ # tray_now="1" — matches pending (5%4=1), pending should win over snow
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 5
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 6. H2D ams_extruder_map fallback
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DFallback(_H2DFixtureMixin):
|
|
|
+ """ams_extruder_map fallback (no pending, no snow)."""
|
|
|
+
|
|
|
+ def test_single_ams_on_extruder_computes_global_id(self, h2d_client):
|
|
|
+ """AMS 0 on right extruder, tray_now='2' → 0*4+2=2."""
|
|
|
+ # h2d_client has snow=0xFF00FF (unloaded) by default, so snow path skips
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ # AMS 0 is the only AMS on extruder 0 (right, active by default)
|
|
|
+ # Fallback: single AMS → global = 0*4+2 = 2
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+ def test_multiple_ams_keeps_current_if_valid(self, h2d_client):
|
|
|
+ """Current tray matches slot → keeps it (multi-AMS on same extruder)."""
|
|
|
+ # Set up: two AMS units on the same extruder (right, ext 0)
|
|
|
+ h2d_client.state.ams_extruder_map = {"0": 0, "1": 0}
|
|
|
+ # Pre-set tray_now=5 (AMS 1 slot 1) — current_ams=1 which is in ams_on_extruder
|
|
|
+ h2d_client.state.tray_now = 5
|
|
|
+ # tray_now="1" → 5%4=1 matches → keep current=5
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 5
|
|
|
+
|
|
|
+ def test_no_ams_on_extruder_uses_raw_slot(self, h2d_client):
|
|
|
+ """No AMS mapped to the active extruder → raw slot as global ID."""
|
|
|
+ # All AMS on left extruder, but right is active
|
|
|
+ h2d_client.state.ams_extruder_map = {"0": 1, "128": 1}
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 7. H2D Active extruder switching
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DActiveExtruder(_H2DFixtureMixin):
|
|
|
+ """Active extruder switching via device.extruder.state bit 8."""
|
|
|
+
|
|
|
+ def test_active_extruder_right_by_default(self, h2d_client):
|
|
|
+ """Initial state.active_extruder == 0 (right)."""
|
|
|
+ assert h2d_client.state.active_extruder == 0
|
|
|
+
|
|
|
+ def test_extruder_state_bit8_switches_to_left(self, h2d_client):
|
|
|
+ """state=0x100 → active_extruder=1 (left)."""
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ assert h2d_client.state.active_extruder == 1
|
|
|
+
|
|
|
+ def test_extruder_state_bit8_switches_back_to_right(self, h2d_client):
|
|
|
+ """Cycle 0 → 1 → 0."""
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ assert h2d_client.state.active_extruder == 1
|
|
|
+
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0001))
|
|
|
+ assert h2d_client.state.active_extruder == 0
|
|
|
+
|
|
|
+ def test_extruder_switch_changes_tray_disambiguation(self, h2d_client):
|
|
|
+ """Snow on both extruders; switching active changes which snow is used."""
|
|
|
+ # Snow: ext 0 → AMS 0 slot 1 (global 1), ext 1 → AMS 128 slot 0 (global 128)
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 1}, # AMS 0 slot 1 → global 1
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0}, # AMS HT → global 128
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ # Right active (default) — tray_now="1" → snow ext[0] says global 1
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 1
|
|
|
+
|
|
|
+ # Switch to left
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+
|
|
|
+ # Left active — tray_now="0" → snow ext[1] says AMS HT (128), slot 0 matches
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# 8. H2D Full multi-message sequences
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTrayNowDualNozzleH2DFullSequence(_H2DFixtureMixin):
|
|
|
+ """Multi-message sequences simulating real H2D Pro prints."""
|
|
|
+
|
|
|
+ def test_h2d_right_nozzle_ams0_lifecycle(self, h2d_client):
|
|
|
+ """Setup → load AMS 0 slot 1 → verify tray_now=1."""
|
|
|
+ # Snow update: extruder 0 loading AMS 0 slot 1
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 1},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ # Printer reports tray_now="1"
|
|
|
+ h2d_client._process_message(_ams_payload(1))
|
|
|
+ assert h2d_client.state.tray_now == 1
|
|
|
+ assert h2d_client.state.last_loaded_tray == 1
|
|
|
+
|
|
|
+ def test_h2d_left_nozzle_ams_ht_lifecycle(self, h2d_client):
|
|
|
+ """Setup → switch left → load AMS HT → verify tray_now=128."""
|
|
|
+ # Switch to left extruder
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+
|
|
|
+ # Snow: ext 1 → AMS HT slot 0
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0xFF00FF},
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ # Printer reports tray_now="0" (AMS HT single slot)
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+ assert h2d_client.state.last_loaded_tray == 128
|
|
|
+
|
|
|
+ def test_h2d_multi_color_alternating_nozzles(self, h2d_client):
|
|
|
+ """Multi-color print alternating between right and left nozzles.
|
|
|
+
|
|
|
+ Sequence:
|
|
|
+ 1. Right loads AMS 0 slot 0 (tray=0)
|
|
|
+ 2. Switch left, load AMS HT (tray=128)
|
|
|
+ 3. Switch right, snow updates, load AMS 0 slot 2 (tray=2)
|
|
|
+ 4. Unload (255)
|
|
|
+ """
|
|
|
+ # Step 1: Right extruder loads AMS 0 slot 0
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 0},
|
|
|
+ {"id": 1, "snow": 0xFF00FF},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 0
|
|
|
+
|
|
|
+ # Step 2: Switch to left, load AMS HT
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0100))
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 0},
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ h2d_client._process_message(_ams_payload(0))
|
|
|
+ assert h2d_client.state.tray_now == 128
|
|
|
+
|
|
|
+ # Step 3: Switch back to right, load AMS 0 slot 2
|
|
|
+ h2d_client._process_message(_extruder_state_payload(0x0001))
|
|
|
+ h2d_client._process_message(
|
|
|
+ _extruder_info_payload(
|
|
|
+ [
|
|
|
+ {"id": 0, "snow": 0 << 8 | 2},
|
|
|
+ {"id": 1, "snow": 128 << 8 | 0},
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ h2d_client._process_message(_ams_payload(2))
|
|
|
+ assert h2d_client.state.tray_now == 2
|
|
|
+
|
|
|
+ # Step 4: Unload
|
|
|
+ h2d_client._process_message(_ams_payload(255))
|
|
|
+ assert h2d_client.state.tray_now == 255
|
|
|
+ assert h2d_client.state.last_loaded_tray == 2
|