Browse Source

fix(mqtt): detect zombie sessions via ams_filament_setting response tracking (#887)

  After hours idle the MQTT connection can degrade so telemetry still
  flows but published commands never reach the printer.  The existing
  dev-mode probe only ran on first connect; this adds tracking for
  user-initiated ams_filament_setting commands — two consecutive
  unanswered commands (10 s timeout each) trigger force_reconnect.
maziggy 1 month ago
parent
commit
a95a3c52ee
3 changed files with 199 additions and 0 deletions
  1. 0 0
      CHANGELOG.md
  2. 31 0
      backend/app/services/bambu_mqtt.py
  3. 168 0
      backend/tests/unit/services/test_bambu_mqtt.py

File diff suppressed because it is too large
+ 0 - 0
CHANGELOG.md


+ 31 - 0
backend/app/services/bambu_mqtt.py

@@ -367,6 +367,12 @@ class BambuMQTTClient:
         # when the frontend polls status faster than paho can reconnect.
         # when the frontend polls status faster than paho can reconnect.
         self._last_stale_reconnect: float = 0.0
         self._last_stale_reconnect: float = 0.0
 
 
+        # Zombie session detection via ams_filament_setting response tracking (#887).
+        # The dev-mode probe only runs on first connect; this catches zombie sessions
+        # that develop later (telemetry flows but publishes silently fail).
+        self._last_ams_cmd_time: float = 0.0  # monotonic time of last published command
+        self._ams_cmd_unanswered: int = 0  # consecutive commands with no response
+
     @property
     @property
     def topic_subscribe(self) -> str:
     def topic_subscribe(self) -> str:
         return f"device/{self.serial_number}/report"
         return f"device/{self.serial_number}/report"
@@ -457,6 +463,8 @@ class BambuMQTTClient:
             self._dev_mode_probe_time = 0.0
             self._dev_mode_probe_time = 0.0
             self._dev_mode_probe_failures = 0
             self._dev_mode_probe_failures = 0
             self._connect_time = time.monotonic()
             self._connect_time = time.monotonic()
+            self._last_ams_cmd_time = 0.0
+            self._ams_cmd_unanswered = 0
             client.subscribe(self.topic_subscribe)
             client.subscribe(self.topic_subscribe)
             # Subscribe to request topic for ams_mapping capture (if supported by broker)
             # Subscribe to request topic for ams_mapping capture (if supported by broker)
             if self._request_topic_supported:
             if self._request_topic_supported:
@@ -771,6 +779,10 @@ class BambuMQTTClient:
                     and print_data.get("sequence_id") == self._dev_mode_probe_seq
                     and print_data.get("sequence_id") == self._dev_mode_probe_seq
                 ):
                 ):
                     self._handle_dev_mode_probe_response(print_data)
                     self._handle_dev_mode_probe_response(print_data)
+                # Track user-initiated ams_filament_setting responses (#887 zombie detection)
+                elif cmd == "ams_filament_setting" and self._last_ams_cmd_time > 0:
+                    self._last_ams_cmd_time = 0.0
+                    self._ams_cmd_unanswered = 0
             if "command" in print_data and print_data.get("command") == "extrusion_cali_get":
             if "command" in print_data and print_data.get("command") == "extrusion_cali_get":
                 self._handle_kprofile_response(print_data)
                 self._handle_kprofile_response(print_data)
 
 
@@ -2549,6 +2561,23 @@ class BambuMQTTClient:
                     # Allow retry on next full status message
                     # Allow retry on next full status message
                     self._dev_mode_probed = False
                     self._dev_mode_probed = False
 
 
+        # Zombie session detection: if an ams_filament_setting command has been
+        # pending for >10s with no response, the publish path is likely dead (#887).
+        if self._last_ams_cmd_time > 0:
+            elapsed = time.monotonic() - self._last_ams_cmd_time
+            if elapsed > 10.0:
+                self._ams_cmd_unanswered += 1
+                logger.warning(
+                    "[%s] ams_filament_setting unanswered for %.0fs (count=%d)",
+                    self.serial_number,
+                    elapsed,
+                    self._ams_cmd_unanswered,
+                )
+                self._last_ams_cmd_time = 0.0  # don't re-trigger on next push_status
+                if self._ams_cmd_unanswered >= 2:
+                    self.force_reconnect_stale_session("ams_filament_setting unanswered 2\u00d7")
+                    self._ams_cmd_unanswered = 0
+
         # Log mapping data when received (for usage tracking debugging)
         # Log mapping data when received (for usage tracking debugging)
         if "mapping" in data:
         if "mapping" in data:
             logger.debug("[%s] MQTT mapping field: %s", self.serial_number, data["mapping"])
             logger.debug("[%s] MQTT mapping field: %s", self.serial_number, data["mapping"])
@@ -4328,6 +4357,7 @@ class BambuMQTTClient:
         )
         )
         logger.debug("[%s] ams_filament_setting command: %s", self.serial_number, command_json)
         logger.debug("[%s] ams_filament_setting command: %s", self.serial_number, command_json)
         self._client.publish(self.topic_publish, command_json, qos=1)
         self._client.publish(self.topic_publish, command_json, qos=1)
+        self._last_ams_cmd_time = time.monotonic()
         return True
         return True
 
 
     def reset_ams_slot(self, ams_id: int, tray_id: int) -> bool:
     def reset_ams_slot(self, ams_id: int, tray_id: int) -> bool:
@@ -4385,6 +4415,7 @@ class BambuMQTTClient:
         logger.info("[%s] Resetting AMS slot: AMS %s, tray %s", self.serial_number, ams_id, tray_id)
         logger.info("[%s] Resetting AMS slot: AMS %s, tray %s", self.serial_number, ams_id, tray_id)
         logger.debug("[%s] reset_ams_slot command: %s", self.serial_number, command_json)
         logger.debug("[%s] reset_ams_slot command: %s", self.serial_number, command_json)
         self._client.publish(self.topic_publish, command_json, qos=1)
         self._client.publish(self.topic_publish, command_json, qos=1)
+        self._last_ams_cmd_time = time.monotonic()
         return True
         return True
 
 
     def extrusion_cali_sel(
     def extrusion_cali_sel(

+ 168 - 0
backend/tests/unit/services/test_bambu_mqtt.py

@@ -3737,3 +3737,171 @@ class TestSdCardParsing:
         assert client.state.sdcard is True
         assert client.state.sdcard is True
         client._update_state({"sdcard": False})
         client._update_state({"sdcard": False})
         assert client.state.sdcard is False
         assert client.state.sdcard is False
+
+
+class TestZombieSessionDetection:
+    """Tests for ams_filament_setting response tracking (#887).
+
+    When a printer's MQTT session degrades so that telemetry flows but
+    published commands never reach the printer, the zombie detector
+    counts consecutive unanswered ams_filament_setting commands and
+    force-reconnects after two.
+    """
+
+    @pytest.fixture
+    def mqtt_client(self):
+        import time
+        from unittest.mock import MagicMock
+
+        from backend.app.services.bambu_mqtt import BambuMQTTClient
+
+        client = BambuMQTTClient(
+            ip_address="192.168.1.100",
+            serial_number="TEST123",
+            access_code="12345678",
+        )
+        client.state.connected = True
+        mock_paho = MagicMock()
+        mock_paho.socket.return_value = MagicMock()
+        client._client = mock_paho
+        client._connect_time = time.monotonic() - 10.0
+        # Set developer_mode so the dev-mode probe branch doesn't interfere
+        client.state.developer_mode = True
+        return client
+
+    def test_initial_state_is_clean(self, mqtt_client):
+        """Tracking fields start at zero / no pending command."""
+        assert mqtt_client._last_ams_cmd_time == 0.0
+        assert mqtt_client._ams_cmd_unanswered == 0
+
+    def test_publish_sets_pending_time(self, mqtt_client):
+        """set_ams_filament_setting records the publish timestamp."""
+        import time
+
+        before = time.monotonic()
+        mqtt_client.ams_set_filament_setting(
+            ams_id=0,
+            tray_id=0,
+            tray_info_idx="GFL99",
+            tray_type="PLA",
+            tray_sub_brands="",
+            tray_color="FF0000FF",
+            nozzle_temp_min=190,
+            nozzle_temp_max=230,
+        )
+        assert mqtt_client._last_ams_cmd_time >= before
+
+    def test_reset_slot_sets_pending_time(self, mqtt_client):
+        """reset_ams_slot also records the publish timestamp."""
+        import time
+
+        before = time.monotonic()
+        mqtt_client.reset_ams_slot(ams_id=0, tray_id=0)
+        assert mqtt_client._last_ams_cmd_time >= before
+
+    def test_response_clears_pending(self, mqtt_client):
+        """An ams_filament_setting response clears the pending state."""
+        import time
+
+        mqtt_client._last_ams_cmd_time = time.monotonic()
+        mqtt_client._ams_cmd_unanswered = 1
+
+        # Simulate receiving a user-command response (sequence_id "0")
+        print_data = {
+            "command": "ams_filament_setting",
+            "sequence_id": "0",
+            "result": "success",
+        }
+        # Walk the same path as _on_message: command response check then _update_state
+        cmd = print_data.get("command")
+        if cmd == "ams_filament_setting" and mqtt_client._last_ams_cmd_time > 0:
+            mqtt_client._last_ams_cmd_time = 0.0
+            mqtt_client._ams_cmd_unanswered = 0
+
+        assert mqtt_client._last_ams_cmd_time == 0.0
+        assert mqtt_client._ams_cmd_unanswered == 0
+
+    def test_single_timeout_increments_counter(self, mqtt_client):
+        """One unanswered command increments the counter but does not reconnect."""
+        import time
+
+        mqtt_client._last_ams_cmd_time = time.monotonic() - 11.0
+
+        mqtt_client._update_state({"gcode_state": "IDLE"})
+
+        assert mqtt_client._ams_cmd_unanswered == 1
+        assert mqtt_client._last_ams_cmd_time == 0.0
+        # Should NOT force-reconnect after just one
+        assert mqtt_client.state.connected is True
+
+    def test_two_timeouts_force_reconnect(self, mqtt_client):
+        """Two consecutive unanswered commands trigger force_reconnect."""
+        import time
+
+        state_change_called = []
+        mqtt_client.on_state_change = lambda s: state_change_called.append(True)
+
+        # First unanswered command
+        mqtt_client._last_ams_cmd_time = time.monotonic() - 11.0
+        mqtt_client._update_state({"gcode_state": "IDLE"})
+        assert mqtt_client._ams_cmd_unanswered == 1
+        assert mqtt_client.state.connected is True
+
+        # Second unanswered command
+        mqtt_client._last_ams_cmd_time = time.monotonic() - 11.0
+        mqtt_client._update_state({"gcode_state": "IDLE"})
+
+        assert mqtt_client._ams_cmd_unanswered == 0  # reset after reconnect
+        assert mqtt_client.state.connected is False
+        assert mqtt_client._stale_reconnecting is True
+        mqtt_client._client.socket().close.assert_called()
+        assert len(state_change_called) > 0
+
+    def test_response_between_timeouts_resets_counter(self, mqtt_client):
+        """A successful response after one timeout resets the counter."""
+        import time
+
+        # First unanswered command
+        mqtt_client._last_ams_cmd_time = time.monotonic() - 11.0
+        mqtt_client._update_state({"gcode_state": "IDLE"})
+        assert mqtt_client._ams_cmd_unanswered == 1
+
+        # Now a response arrives — clear pending
+        mqtt_client._last_ams_cmd_time = time.monotonic()
+        mqtt_client._last_ams_cmd_time = 0.0
+        mqtt_client._ams_cmd_unanswered = 0
+
+        # Next unanswered command should be count=1, not count=2
+        mqtt_client._last_ams_cmd_time = time.monotonic() - 11.0
+        mqtt_client._update_state({"gcode_state": "IDLE"})
+        assert mqtt_client._ams_cmd_unanswered == 1
+        assert mqtt_client.state.connected is True  # no reconnect
+
+    def test_on_connect_resets_tracking(self, mqtt_client):
+        """_on_connect resets zombie tracking fields."""
+        import time
+
+        mqtt_client._last_ams_cmd_time = time.monotonic()
+        mqtt_client._ams_cmd_unanswered = 5
+
+        # subscribe() must return (result, mid) tuple
+        mqtt_client._client.subscribe.return_value = (0, 1)
+        mqtt_client._on_connect(mqtt_client._client, None, None, 0)
+
+        assert mqtt_client._last_ams_cmd_time == 0.0
+        assert mqtt_client._ams_cmd_unanswered == 0
+
+    def test_no_check_when_no_command_pending(self, mqtt_client):
+        """If no command was published, push_status does not trigger detection."""
+        assert mqtt_client._last_ams_cmd_time == 0.0
+        mqtt_client._update_state({"gcode_state": "IDLE"})
+        assert mqtt_client._ams_cmd_unanswered == 0
+
+    def test_no_timeout_within_window(self, mqtt_client):
+        """A command published <10s ago should not trigger a timeout."""
+        import time
+
+        mqtt_client._last_ams_cmd_time = time.monotonic() - 5.0
+        mqtt_client._update_state({"gcode_state": "IDLE"})
+        assert mqtt_client._ams_cmd_unanswered == 0
+        assert mqtt_client._last_ams_cmd_time > 0  # still pending

Some files were not shown because too many files changed in this diff