Browse Source

fix(mqtt): #1136 reprint fails with 0500_4003 SD R/W after stuck dispatch

  Reprinting from archives sometimes failed immediately with a MicroSD R/W
  exception, with the printer's MQTT push referencing a 3MF from a
  different unrelated archive. Once it started, every subsequent reprint
  hit the same error until the container was restarted.

  Root cause from @smandon's support package: paho-mqtt's client-side QoS
  1 queue. When the printer's command channel goes half-broken (telemetry
  flowing, publishes silently dropped — same #887/#936 pattern),
  background_dispatch.py:993 hits its 15s deadline and calls
  force_reconnect_stale_session(). That function was force-closing the
  underlying socket so paho's auto-reconnect would kick in, but the same
  mqtt.Client instance, same client_id, and same in-process QoS 1 queue
  stayed alive across the reconnect. Any unacked publish from the broken
  session — typically the just-sent project_file for the new archive —
  got replayed verbatim on the new connection. The queue accumulates
  across multiple stuck dispatches in one Python process, so by the
  second or third stuck reprint there were several stale
  project_file/resume/stop/clean_print_error commands queued together;
  the printer latched onto whichever stale path it processed last,
  couldn't find the file on its SD card, and emitted 0500_4003. Container
  restart was the only thing that wiped paho's in-process queue.

  Replaced socket-close with a context-aware reconnect via a new
  _reset_client_for_reconnect() router:

    Async-context callers (dispatch deadline, FastAPI handlers via
    check_staleness) → hard-reset: client.disconnect() (broker drops
    session, clean_session=True), client.loop_stop() (kills paho's
    network thread and its queue), null _client, fresh connect() with
    incremented client_id. New connection is genuinely empty, no replay.

    Paho-network-thread callers (dev-mode probe + ams_filament_setting
    zombie detection inside _update_state) → socket-close fallback.
    loop_stop() from inside the network thread would self-join and
    deadlock, so the safe pattern there is "close the socket and let
    paho's loop detect it and auto-reconnect on the same client".

  Routing decision uses asyncio.get_running_loop() — paho's callback
  thread has no loop, every legitimate hard-reset caller does.

  7 regression tests:
  - TestForceReconnectRouting (3): sync-context → socket-close fallback,
    async-context → hard-reset with disconnect()+loop_stop()+null,
    state-disconnected broadcast fires once on either path
  - TestHardResetClientDirect (3): helper directly — old client gets
    disconnect()+loop_stop(), _client cleared, failing disconnect()
    doesn't propagate so background_dispatch's await chain can't break
  - TestZombieSessionDetection / TestDeveloperModeProbeTimeout (updated):
    paho-thread context still goes through socket-close, preserving the
    legacy contract for those paths
maziggy 1 month ago
parent
commit
527f8ea471
3 changed files with 205 additions and 20 deletions
  1. 0 0
      CHANGELOG.md
  2. 81 16
      backend/app/services/bambu_mqtt.py
  3. 124 4
      backend/tests/unit/services/test_bambu_mqtt.py

File diff suppressed because it is too large
+ 0 - 0
CHANGELOG.md


+ 81 - 16
backend/app/services/bambu_mqtt.py

@@ -425,31 +425,96 @@ class BambuMQTTClient:
             self.state.connected = False
             if self.on_state_change:
                 self.on_state_change(self.state)
-            # Force-close the underlying socket so paho's loop thread detects
-            # the broken connection and triggers auto-reconnect.  We don't call
-            # client.disconnect() because that's a clean disconnect and paho
-            # would NOT auto-reconnect afterwards.
-            # Set flag so _on_disconnect knows this was intentional and skips
-            # redundant state broadcast (we already set connected=False above).
+            # Route based on caller thread — see force_reconnect_stale_session.
+            # check_staleness is normally called from FastAPI handlers (async,
+            # gets the hard-reset path) but the dispatcher exists for safety.
             self._stale_reconnecting = True
-            if self._client:
-                try:
-                    sock = self._client.socket()
-                    if sock:
-                        sock.close()
-                except Exception:
-                    pass  # Best-effort; paho loop will reconnect on next iteration
+            self._reset_client_for_reconnect()
         return self.state.connected
 
     def force_reconnect_stale_session(self, reason: str) -> None:
-        # Heals the #887 half-broken session: telemetry keeps arriving but our
-        # publishes no longer reach the printer. Closing the socket makes paho
-        # drop and re-establish with a fresh session.
+        # Heals the #887/#936/#1136 half-broken session: telemetry keeps
+        # arriving but our publishes don't reach the printer.
+        #
+        # Two routing paths:
+        #
+        # Async-context callers (background_dispatch.py:993 — dispatch deadline)
+        #   → full client teardown + fresh client_id. Wipes paho's client-side
+        #     QoS 1 queue, which is exactly the #1136 reproducer: an unacked
+        #     `project_file` from the broken session would otherwise replay on
+        #     reconnect, mixing stale commands into the next dispatch and
+        #     triggering 0500_4003 SD R/W on the printer.
+        #
+        # Paho-network-thread callers (line ~2604/~2623 — dev-mode probe and
+        # ams_filament_setting zombie detection inside `_update_state`)
+        #   → socket-close fallback. Calling `loop_stop()` from inside the
+        #     network thread would self-join and deadlock; the safe pattern is
+        #     to close the socket and let paho's own loop detect the broken
+        #     connection and auto-reconnect (same instance, same client_id —
+        #     queue replay is theoretically possible here but those paths have
+        #     always done socket-close and #1136 was specifically triggered
+        #     from the dispatch path).
         logger.warning("[%s] Forcing MQTT reconnect: %s", self.serial_number, reason)
         self._stale_reconnecting = True
         self.state.connected = False
         if self.on_state_change:
             self.on_state_change(self.state)
+        self._reset_client_for_reconnect()
+
+    def _reset_client_for_reconnect(self) -> None:
+        """Route between hard-reset and socket-close based on caller thread.
+
+        Hard-reset (preferred) requires we're not running on paho's network
+        thread, since `loop_stop()` on the same thread deadlocks. Detect via
+        ``asyncio.get_running_loop()`` — paho's callback thread has no loop;
+        every legitimate hard-reset caller (FastAPI handlers, background
+        async tasks) does."""
+        try:
+            loop = asyncio.get_running_loop()
+        except RuntimeError:
+            loop = None
+
+        if loop is not None:
+            self._loop = loop
+            self._hard_reset_client()
+        else:
+            self._socket_close_for_reconnect()
+
+    def _hard_reset_client(self) -> None:
+        """Tear down the paho client entirely and rebuild it with a fresh
+        client_id, so the broker drops the old session and paho's local
+        QoS 1 queue is gone. Must NOT be called from paho's network thread.
+        Caller is responsible for setting ``_stale_reconnecting`` and
+        broadcasting the disconnected state."""
+        old_client = self._client
+        self._client = None
+        if old_client is not None:
+            try:
+                old_client.disconnect()  # MQTT DISCONNECT — broker drops session
+            except Exception:
+                pass
+            try:
+                old_client.loop_stop()  # blocks briefly until the network thread exits
+            except Exception:
+                pass
+        # Skip reconnect if no asyncio loop is available (test environment or
+        # pre-init). The next initial connect() call from PrinterManager will
+        # set up the client fresh.
+        if self._loop is None:
+            return
+        try:
+            self.connect(loop=self._loop)
+        except Exception as e:
+            logger.error("[%s] Hard reset reconnect failed: %s", self.serial_number, e)
+
+    def _socket_close_for_reconnect(self) -> None:
+        """Close the underlying socket so paho's loop thread detects the
+        broken connection and triggers auto-reconnect on the SAME client
+        instance. Safe to call from paho's own network thread (the loop
+        polls the socket on every iteration and handles a closed socket
+        gracefully). Used as a fallback when hard-reset isn't safe; queue
+        replay remains theoretically possible here but #1136 specifically
+        traced through the dispatch-deadline path which now hard-resets."""
         if self._client:
             try:
                 sock = self._client.socket()

+ 124 - 4
backend/tests/unit/services/test_bambu_mqtt.py

@@ -2983,7 +2983,11 @@ class TestDeveloperModeProbeTimeout:
         assert mqtt_client.state.connected is True
 
     def test_second_timeout_forces_reconnect(self, mqtt_client):
-        """After two consecutive probe timeouts, force-close the socket."""
+        """After two consecutive probe timeouts, force-close the socket.
+
+        Probe timeout detection runs from paho's network thread (no asyncio
+        loop), so force_reconnect_stale_session routes through socket-close
+        rather than hard-reset (loop_stop from inside the loop deadlocks)."""
         import time
 
         data = self._make_pushall_data()
@@ -3005,9 +3009,8 @@ class TestDeveloperModeProbeTimeout:
         assert mqtt_client._dev_mode_probe_failures == 2
         assert mqtt_client.state.connected is False
         assert mqtt_client._stale_reconnecting is True
-        # Socket should have been closed
+        # Sync test → no running loop → socket-close fallback path
         mqtt_client._client.socket().close.assert_called()
-        # on_state_change should have been called
         assert len(state_change_called) > 0
 
     def test_successful_probe_resets_failure_counter(self, mqtt_client):
@@ -4088,7 +4091,13 @@ class TestZombieSessionDetection:
         assert mqtt_client.state.connected is True
 
     def test_two_timeouts_force_reconnect(self, mqtt_client):
-        """Two consecutive unanswered commands trigger force_reconnect."""
+        """Two consecutive unanswered commands trigger force_reconnect.
+
+        Zombie detection runs from paho's network thread (no asyncio loop), so
+        the routing in force_reconnect_stale_session falls back to socket-close
+        — which is the safe option since loop_stop() from inside the loop
+        thread would deadlock. Hard-reset is reserved for async-context callers
+        (background_dispatch dispatch path)."""
         import time
 
         state_change_called = []
@@ -4107,6 +4116,7 @@ class TestZombieSessionDetection:
         assert mqtt_client._ams_cmd_unanswered == 0  # reset after reconnect
         assert mqtt_client.state.connected is False
         assert mqtt_client._stale_reconnecting is True
+        # Sync test → no running loop → socket-close fallback path
         mqtt_client._client.socket().close.assert_called()
         assert len(state_change_called) > 0
 
@@ -4223,3 +4233,113 @@ class TestHMSUserActionFiltering:
         mqtt_client._update_state({"print_error": 0x0500_8061})
         assert len(mqtt_client.state.hms_errors) == 1
         assert mqtt_client.state.hms_errors[0].code == "0x8061"
+
+
+class TestForceReconnectRouting:
+    """#1136 — force_reconnect_stale_session routes between hard-reset (full
+    paho-client teardown, wipes the QoS 1 queue) and socket-close (the legacy
+    behaviour, safe to call from paho's own network thread). The routing
+    decision is based on whether an asyncio loop is running: hard-reset
+    requires loop_stop() which would deadlock if called from inside the
+    network thread itself."""
+
+    @pytest.fixture
+    def mqtt_client(self):
+        from unittest.mock import MagicMock
+
+        from backend.app.services.bambu_mqtt import BambuMQTTClient
+
+        client = BambuMQTTClient(
+            ip_address="192.168.1.100",
+            serial_number="TEST_HARD_RESET",
+            access_code="12345678",
+        )
+        client.state.connected = True
+        client._client = MagicMock()
+        return client
+
+    def test_routing_falls_back_to_socket_close_without_running_loop(self, mqtt_client):
+        """Sync caller → no asyncio loop → socket-close path (legacy behaviour
+        preserved for paho-thread callers like zombie detection)."""
+        mqtt_client.force_reconnect_stale_session("test")
+        mqtt_client._client.socket().close.assert_called()
+        # Old client is NOT torn down on this path; same-instance reconnect
+        # via paho's auto-reconnect handles it.
+        assert mqtt_client._client is not None
+
+    def test_routing_uses_hard_reset_when_loop_is_running(self, mqtt_client):
+        """Async caller → loop available → hard-reset path wipes the queue."""
+        import asyncio
+
+        original = mqtt_client._client
+        # Stub connect() so the rebuild doesn't open a real socket.
+        mqtt_client.connect = lambda loop=None: None
+
+        async def _trigger():
+            mqtt_client.force_reconnect_stale_session("test")
+
+        asyncio.run(_trigger())
+        original.disconnect.assert_called()
+        original.loop_stop.assert_called()
+        # connect() stub didn't repopulate _client, so it's None — the contract
+        # in production is that connect() builds a fresh mqtt.Client here.
+        assert mqtt_client._client is None
+
+    def test_marks_state_disconnected_and_broadcasts(self, mqtt_client):
+        """Both routing paths must broadcast the disconnected state once."""
+        broadcasts: list[bool] = []
+        mqtt_client.on_state_change = lambda s: broadcasts.append(s.connected)
+        mqtt_client.force_reconnect_stale_session("test")
+        assert mqtt_client.state.connected is False
+        assert mqtt_client._stale_reconnecting is True
+        assert broadcasts == [False]
+
+
+class TestHardResetClientDirect:
+    """Lower-level coverage of `_hard_reset_client` itself — the helper called
+    by the routing layer when a full paho-client teardown is safe. These tests
+    drive the helper directly so they don't depend on the routing decision."""
+
+    @pytest.fixture
+    def mqtt_client(self):
+        from unittest.mock import MagicMock
+
+        from backend.app.services.bambu_mqtt import BambuMQTTClient
+
+        client = BambuMQTTClient(
+            ip_address="192.168.1.100",
+            serial_number="TEST_HARD_DIRECT",
+            access_code="12345678",
+        )
+        client.state.connected = True
+        client._client = MagicMock()
+        # Stub connect() so the rebuild doesn't open a real socket.
+        client.connect = lambda loop=None: None
+        return client
+
+    def test_disconnects_and_stops_old_client(self, mqtt_client):
+        """Old paho client must receive DISCONNECT (broker drops session) +
+        loop_stop (network thread exits, taking its QoS 1 queue with it)."""
+        original = mqtt_client._client
+        mqtt_client._hard_reset_client()
+        original.disconnect.assert_called()
+        original.loop_stop.assert_called()
+
+    def test_clears_client_reference(self, mqtt_client):
+        """Old reference must go to None so subsequent code can't accidentally
+        publish through the dying client."""
+        mqtt_client._hard_reset_client()
+        assert mqtt_client._client is None
+
+    def test_swallows_disconnect_exception(self, mqtt_client):
+        """A failing disconnect() (e.g. paho already in error state) must not
+        propagate — the await chain in background_dispatch.py would otherwise
+        raise instead of moving on, and a single broken client could brick
+        every future dispatch."""
+        original = mqtt_client._client
+        original.disconnect.side_effect = RuntimeError("boom")
+        # No exception escapes the call (test would fail if it did).
+        mqtt_client._hard_reset_client()
+        # loop_stop is still attempted after the disconnect failure.
+        original.loop_stop.assert_called()
+        assert mqtt_client._client is None

Some files were not shown because too many files changed in this diff