Browse Source

y Fix virtual printer "Synchronizing device information" timeout in Orca (#927)

  OrcaSlicer's "Send job" flow sat on "Synchronizing device information…"
  until it gave up, even though FTP upload worked when the user clicked
  "Send job anyway". The virtual printer's MQTT server gated all incoming
  command handling on `f"device/{self.serial}/request" in topic` — if the
  slicer's cached serial for the VP didn't exactly equal the VP's computed
  self.serial (model prefix + per-VP serial_suffix), every get_version,
  pushall, and project_file publish was silently dropped. Nothing was
  logged past the initial "MQTT publish to …" line, so the slicer never
  received a push_status or get_version response on its subscribed
  device/{serial}/report topic and hit its sync timeout. Responses were
  also unconditionally published on device/{self.serial}/report, so even
  when the inbound check happened to pass, replies targeted a topic the
  slicer wasn't listening on if its serial had drifted.

  Both directions are now serial-adaptive:

  - `_handle_publish` accepts any authenticated publish on a
    `device/*/request` topic and extracts the serial from the topic itself
    rather than comparing against self.serial.
  - A per-connection `_client_serials` dict tracks the serial the slicer
    actually uses, populated from the first SUBSCRIBE or PUBLISH seen on
    each connection and cleared on disconnect/stop.
  - `_send_status_report`, `_send_version_response`, `_send_print_response`
    now take an optional `serial` parameter (defaulting to self.serial)
    so every outgoing publish — including the periodic 1-second status
    push — targets the topic the slicer subscribed to.
  - The version response's embedded `module[].sn` fields now also carry
    the client's serial so the payload is internally consistent with the
    topic.
  - When the client's serial differs from self.serial an INFO log records
    the adaptation so it's visible in future support bundles.

  The working case (slicer's cached serial equals self.serial, as in my
  own H2D-1 Proxy setup) is bit-for-bit identical to the old behavior —
  the new check is strictly more permissive and only affects cases the
  old code silently dropped.

  Regression tests cover:
  - `_extract_serial_from_topic` valid/invalid topic shapes
  - mismatched-serial publish → handler runs, response topic and sn field
    both use the client's serial
  - non-`/request` topics → still rejected
  - pushall → status_report routed to the client's subscribed topic
  - `_client_serials` cleared on stop()
maziggy 1 month ago
parent
commit
b069b521

File diff suppressed because it is too large
+ 1 - 0
CHANGELOG.md


+ 138 - 65
backend/app/services/virtual_printer/mqtt_server.py

@@ -201,6 +201,14 @@ class SimpleMQTTServer:
         self._running = False
         self._running = False
         self._server = None
         self._server = None
         self._clients: dict[str, asyncio.StreamWriter] = {}
         self._clients: dict[str, asyncio.StreamWriter] = {}
+        # Per-client "effective serial" — the serial the slicer actually uses in
+        # device/{serial}/report|request topics. Populated from the first
+        # SUBSCRIBE/PUBLISH we see on a connection. This lets the VP respond on
+        # the topic the slicer is listening on even when it disagrees with
+        # self.serial (e.g. a stale Orca config that was bound to an older VP
+        # serial, or a printer entry that was re-pointed at the VP IP without
+        # updating the serial).
+        self._client_serials: dict[str, str] = {}
         self._status_push_task: asyncio.Task | None = None
         self._status_push_task: asyncio.Task | None = None
         self._sequence_id = 0
         self._sequence_id = 0
 
 
@@ -311,6 +319,7 @@ class SimpleMQTTServer:
             except OSError:
             except OSError:
                 pass  # Best-effort client connection cleanup; client may have disconnected
                 pass  # Best-effort client connection cleanup; client may have disconnected
         self._clients.clear()
         self._clients.clear()
+        self._client_serials.clear()
 
 
         if self._server:
         if self._server:
             try:
             try:
@@ -320,6 +329,22 @@ class SimpleMQTTServer:
                 pass  # Best-effort server shutdown; port may already be released
                 pass  # Best-effort server shutdown; port may already be released
             self._server = None
             self._server = None
 
 
+    @staticmethod
+    def _extract_serial_from_topic(topic: str) -> str | None:
+        """Pull the serial out of a `device/{serial}/report|request` topic.
+
+        Returns None if the topic doesn't match that shape — callers fall back
+        to self.serial in that case.
+        """
+        if not topic.startswith("device/"):
+            return None
+        rest = topic[len("device/") :]
+        # Expect "{serial}/report" or "{serial}/request" (possibly with suffixes).
+        slash = rest.find("/")
+        if slash <= 0:
+            return None
+        return rest[:slash]
+
     async def _periodic_status_push(self) -> None:
     async def _periodic_status_push(self) -> None:
         """Send periodic status updates to all connected clients."""
         """Send periodic status updates to all connected clients."""
         logger.info("Starting periodic status push task")
         logger.info("Starting periodic status push task")
@@ -334,7 +359,8 @@ class SimpleMQTTServer:
                         if writer.is_closing():
                         if writer.is_closing():
                             disconnected.append(client_id)
                             disconnected.append(client_id)
                             continue
                             continue
-                        await self._send_status_report(writer)
+                        serial = self._client_serials.get(client_id, self.serial)
+                        await self._send_status_report(writer, serial=serial)
                     except OSError as e:
                     except OSError as e:
                         logger.debug("Failed to push status to %s: %s", client_id, e)
                         logger.debug("Failed to push status to %s: %s", client_id, e)
                         disconnected.append(client_id)
                         disconnected.append(client_id)
@@ -342,6 +368,7 @@ class SimpleMQTTServer:
                 # Remove disconnected clients
                 # Remove disconnected clients
                 for client_id in disconnected:
                 for client_id in disconnected:
                     self._clients.pop(client_id, None)
                     self._clients.pop(client_id, None)
+                    self._client_serials.pop(client_id, None)
 
 
             except asyncio.CancelledError:
             except asyncio.CancelledError:
                 break
                 break
@@ -384,14 +411,17 @@ class SimpleMQTTServer:
                     authenticated = await self._handle_connect(payload, writer)
                     authenticated = await self._handle_connect(payload, writer)
                     if not authenticated:
                     if not authenticated:
                         break
                         break
-                    # Register client for periodic status pushes
+                    # Register client for periodic status pushes; start with
+                    # self.serial as the fallback until we learn the slicer's
+                    # preferred serial from the first SUBSCRIBE/PUBLISH.
                     self._clients[client_id] = writer
                     self._clients[client_id] = writer
+                    self._client_serials[client_id] = self.serial
                 elif packet_type == 3:  # PUBLISH
                 elif packet_type == 3:  # PUBLISH
                     if authenticated:
                     if authenticated:
-                        await self._handle_publish(header[0], payload, writer)
+                        await self._handle_publish(header[0], payload, writer, client_id)
                 elif packet_type == 8:  # SUBSCRIBE
                 elif packet_type == 8:  # SUBSCRIBE
                     if authenticated:
                     if authenticated:
-                        await self._handle_subscribe(payload, writer)
+                        await self._handle_subscribe(payload, writer, client_id)
                 elif packet_type == 12:  # PINGREQ
                 elif packet_type == 12:  # PINGREQ
                     # Send PINGRESP
                     # Send PINGRESP
                     writer.write(bytes([0xD0, 0x00]))
                     writer.write(bytes([0xD0, 0x00]))
@@ -405,8 +435,8 @@ class SimpleMQTTServer:
             logger.debug("MQTT client error: %s", e)
             logger.debug("MQTT client error: %s", e)
         finally:
         finally:
             logger.debug("MQTT client disconnected: %s", client_id)
             logger.debug("MQTT client disconnected: %s", client_id)
-            if client_id in self._clients:
-                del self._clients[client_id]
+            self._clients.pop(client_id, None)
+            self._client_serials.pop(client_id, None)
             try:
             try:
                 writer.close()
                 writer.close()
                 await writer.wait_closed()
                 await writer.wait_closed()
@@ -493,7 +523,7 @@ class SimpleMQTTServer:
             await writer.drain()
             await writer.drain()
             return False
             return False
 
 
-    async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter) -> None:
+    async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
         """Handle MQTT SUBSCRIBE packet."""
         """Handle MQTT SUBSCRIBE packet."""
         try:
         try:
             # Parse packet ID
             # Parse packet ID
@@ -502,6 +532,7 @@ class SimpleMQTTServer:
             # Parse topic filters (just acknowledge them)
             # Parse topic filters (just acknowledge them)
             idx = 2
             idx = 2
             granted_qos = []
             granted_qos = []
+            learned_serial: str | None = None
             while idx < len(payload):
             while idx < len(payload):
                 topic_len = (payload[idx] << 8) | payload[idx + 1]
                 topic_len = (payload[idx] << 8) | payload[idx + 1]
                 idx += 2
                 idx += 2
@@ -513,19 +544,36 @@ class SimpleMQTTServer:
                 logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
                 logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
                 granted_qos.append(min(requested_qos, 1))  # Grant up to QoS 1
                 granted_qos.append(min(requested_qos, 1))  # Grant up to QoS 1
 
 
+                # Remember the serial the slicer is listening on so status/version
+                # responses go to a topic it actually subscribed to.
+                if learned_serial is None:
+                    extracted = self._extract_serial_from_topic(topic)
+                    if extracted:
+                        learned_serial = extracted
+
+            if learned_serial and learned_serial != self._client_serials.get(client_id):
+                if learned_serial != self.serial:
+                    logger.info(
+                        "%sMQTT client subscribed with serial %s (VP serial is %s) — adapting responses",
+                        self._log_prefix,
+                        learned_serial,
+                        self.serial,
+                    )
+                self._client_serials[client_id] = learned_serial
+
             # Send SUBACK
             # Send SUBACK
             suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
             suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
             suback += bytes(granted_qos)
             suback += bytes(granted_qos)
             writer.write(suback)
             writer.write(suback)
             await writer.drain()
             await writer.drain()
 
 
-            # Send initial status report after subscribe
-            await self._send_status_report(writer)
+            # Send initial status report after subscribe on the client's subscribed topic
+            await self._send_status_report(writer, serial=self._client_serials.get(client_id, self.serial))
 
 
         except (IndexError, ValueError, OSError) as e:
         except (IndexError, ValueError, OSError) as e:
             logger.debug("MQTT SUBSCRIBE error: %s", e)
             logger.debug("MQTT SUBSCRIBE error: %s", e)
 
 
-    async def _send_status_report(self, writer: asyncio.StreamWriter) -> None:
+    async def _send_status_report(self, writer: asyncio.StreamWriter, serial: str | None = None) -> None:
         """Send a status report to the slicer after connection."""
         """Send a status report to the slicer after connection."""
         try:
         try:
             # Build status message matching Bambu printer format
             # Build status message matching Bambu printer format
@@ -603,16 +651,21 @@ class SimpleMQTTServer:
                 }
                 }
             }
             }
 
 
-            await self._publish_to_report(writer, status, self.serial)
+            await self._publish_to_report(writer, status, serial or self.serial)
 
 
         except OSError as e:
         except OSError as e:
             logger.error("Failed to send status report: %s", e)
             logger.error("Failed to send status report: %s", e)
 
 
-    async def _send_version_response(self, writer: asyncio.StreamWriter, sequence_id: str) -> None:
+    async def _send_version_response(
+        self, writer: asyncio.StreamWriter, sequence_id: str, serial: str | None = None
+    ) -> None:
         """Send version info response to the slicer."""
         """Send version info response to the slicer."""
         try:
         try:
             product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
             product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
-            serial = self.serial
+            # The serial is embedded inside the module[].sn fields *and* used as
+            # the report topic. Use the client's effective serial so the slicer
+            # sees internal/topic consistency even when it differs from self.serial.
+            serial = serial or self.serial
 
 
             # Build version response matching OrcaSlicer expectations
             # Build version response matching OrcaSlicer expectations
             # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
             # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
@@ -715,7 +768,9 @@ class SimpleMQTTServer:
         except TimeoutError:
         except TimeoutError:
             logger.debug("MQTT drain timeout for %s — client may be busy", topic)
             logger.debug("MQTT drain timeout for %s — client may be busy", topic)
 
 
-    async def _send_print_response(self, writer: asyncio.StreamWriter, sequence_id: str, filename: str) -> None:
+    async def _send_print_response(
+        self, writer: asyncio.StreamWriter, sequence_id: str, filename: str, serial: str | None = None
+    ) -> None:
         """Send project_file acknowledgment matching real Bambu printer behavior."""
         """Send project_file acknowledgment matching real Bambu printer behavior."""
         # Update state so periodic status pushes reflect preparation
         # Update state so periodic status pushes reflect preparation
         self._gcode_state = "PREPARE"
         self._gcode_state = "PREPARE"
@@ -739,12 +794,12 @@ class SimpleMQTTServer:
                     "msg": 0,
                     "msg": 0,
                 }
                 }
             }
             }
-            await self._publish_to_report(writer, response)
+            await self._publish_to_report(writer, response, serial or self.serial)
             logger.info("Sent project_file acknowledgment for %s", filename)
             logger.info("Sent project_file acknowledgment for %s", filename)
         except OSError as e:
         except OSError as e:
             logger.error("Failed to send print response: %s", e)
             logger.error("Failed to send print response: %s", e)
 
 
-    async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter) -> None:
+    async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
         """Handle MQTT PUBLISH packet."""
         """Handle MQTT PUBLISH packet."""
         try:
         try:
             # Parse topic
             # Parse topic
@@ -765,55 +820,73 @@ class SimpleMQTTServer:
 
 
             logger.info("MQTT publish to %s: %s...", topic, message[:100])
             logger.info("MQTT publish to %s: %s...", topic, message[:100])
 
 
-            # Handle commands on device request topic
-            if f"device/{self.serial}/request" in topic:
-                try:
-                    data = json.loads(message)
-
-                    # Handle pushing command (status request)
-                    if "pushing" in data:
-                        pushing_data = data["pushing"]
-                        command = pushing_data.get("command", "")
-                        logger.info("MQTT pushing command: %s", command)
-
-                        if command == "pushall":
-                            # Slicer is requesting full status - send response
-                            logger.info("Sending status report in response to pushall")
-                            await self._send_status_report(writer)
-                        elif command == "start":
-                            # Slicer wants periodic status updates - send one now
-                            logger.info("Starting status push stream")
-                            await self._send_status_report(writer)
-
-                    # Handle info commands (get_version, etc.)
-                    if "info" in data:
-                        info_data = data["info"]
-                        command = info_data.get("command", "")
-                        sequence_id = info_data.get("sequence_id", "0")
-                        logger.info("MQTT info command: %s", command)
-
-                        if command == "get_version":
-                            await self._send_version_response(writer, sequence_id)
-
-                    # Handle print commands
-                    if "print" in data:
-                        print_data = data["print"]
-                        command = print_data.get("command", "")
-                        filename = print_data.get("subtask_name", "")
-                        sequence_id = print_data.get("sequence_id", "0")
-
-                        logger.info("MQTT print command: %s for %s", command, filename)
-
-                        if command == "project_file":
-                            # Respond with PREPARE status so slicer proceeds with FTP upload
-                            file_3mf = print_data.get("file", filename)
-                            await self._send_print_response(writer, sequence_id, file_3mf)
-
-                            if self.on_print_command:
-                                await self._notify_print_command(filename, print_data)
-
-                except json.JSONDecodeError:
-                    pass  # Non-JSON payloads on request topic are safely ignored
+            # Only handle publishes on *some* device/.../request topic. The
+            # serial is taken from the topic rather than compared against
+            # self.serial: the client is already authenticated via the access
+            # code, and Orca/BambuStudio may have a cached serial that differs
+            # from the VP's computed self.serial (#927). Use the topic's serial
+            # for all responses so they land on the topic the slicer subscribed
+            # to.
+            if not topic.startswith("device/") or "/request" not in topic:
+                return
+
+            client_serial = self._extract_serial_from_topic(topic) or self.serial
+            if client_serial and client_serial != self._client_serials.get(client_id):
+                if client_serial != self.serial:
+                    logger.info(
+                        "%sMQTT client publishing with serial %s (VP serial is %s) — adapting responses",
+                        self._log_prefix,
+                        client_serial,
+                        self.serial,
+                    )
+                self._client_serials[client_id] = client_serial
+
+            try:
+                data = json.loads(message)
+            except json.JSONDecodeError:
+                return  # Non-JSON payloads on request topic are safely ignored
+
+            # Handle pushing command (status request)
+            if "pushing" in data:
+                pushing_data = data["pushing"]
+                command = pushing_data.get("command", "")
+                logger.info("MQTT pushing command: %s", command)
+
+                if command == "pushall":
+                    # Slicer is requesting full status - send response
+                    logger.info("Sending status report in response to pushall")
+                    await self._send_status_report(writer, serial=client_serial)
+                elif command == "start":
+                    # Slicer wants periodic status updates - send one now
+                    logger.info("Starting status push stream")
+                    await self._send_status_report(writer, serial=client_serial)
+
+            # Handle info commands (get_version, etc.)
+            if "info" in data:
+                info_data = data["info"]
+                command = info_data.get("command", "")
+                sequence_id = info_data.get("sequence_id", "0")
+                logger.info("MQTT info command: %s", command)
+
+                if command == "get_version":
+                    await self._send_version_response(writer, sequence_id, serial=client_serial)
+
+            # Handle print commands
+            if "print" in data:
+                print_data = data["print"]
+                command = print_data.get("command", "")
+                filename = print_data.get("subtask_name", "")
+                sequence_id = print_data.get("sequence_id", "0")
+
+                logger.info("MQTT print command: %s for %s", command, filename)
+
+                if command == "project_file":
+                    # Respond with PREPARE status so slicer proceeds with FTP upload
+                    file_3mf = print_data.get("file", filename)
+                    await self._send_print_response(writer, sequence_id, file_3mf, serial=client_serial)
+
+                    if self.on_print_command:
+                        await self._notify_print_command(filename, print_data)
 
 
         except (IndexError, ValueError, OSError) as e:
         except (IndexError, ValueError, OSError) as e:
             logger.debug("MQTT PUBLISH error: %s", e)
             logger.debug("MQTT PUBLISH error: %s", e)

+ 137 - 0
backend/tests/unit/test_vp_mqtt_server.py

@@ -1,7 +1,13 @@
 """Tests for Virtual Printer MQTT server."""
 """Tests for Virtual Printer MQTT server."""
 
 
 import ast
 import ast
+import asyncio
 import inspect
 import inspect
+import json
+from pathlib import Path
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
 
 
 from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
 from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
 
 
@@ -26,3 +32,134 @@ class TestMQTTServerNoGlobalState:
                     "It overwrites the global asyncio exception handler, "
                     "It overwrites the global asyncio exception handler, "
                     "breaking multi-VP setups."
                     "breaking multi-VP setups."
                 )
                 )
+
+
+def _make_server(serial: str = "01P00A391800001") -> SimpleMQTTServer:
+    """Build a SimpleMQTTServer with dummy cert paths (start() is never called)."""
+    return SimpleMQTTServer(
+        serial=serial,
+        access_code="deadbeef",
+        cert_path=Path("/tmp/unused.crt"),
+        key_path=Path("/tmp/unused.key"),
+        model="C12",
+    )
+
+
+class TestExtractSerialFromTopic:
+    """_extract_serial_from_topic should pull the serial out of device topics."""
+
+    @pytest.mark.parametrize(
+        "topic,expected",
+        [
+            ("device/01P00A391800001/request", "01P00A391800001"),
+            ("device/09400A391800003/report", "09400A391800003"),
+            ("device/00M00A391800004/request/subpath", "00M00A391800004"),
+        ],
+    )
+    def test_valid_topics(self, topic, expected):
+        assert SimpleMQTTServer._extract_serial_from_topic(topic) == expected
+
+    @pytest.mark.parametrize(
+        "topic",
+        [
+            "",
+            "device/",
+            "device//request",  # empty serial
+            "notdevice/01P00A/request",
+            "random",
+        ],
+    )
+    def test_invalid_topics(self, topic):
+        assert SimpleMQTTServer._extract_serial_from_topic(topic) is None
+
+
+def _build_publish_payload(topic: str, message: dict) -> bytes:
+    """Build the MQTT PUBLISH packet *payload* (past the fixed header byte)."""
+    topic_bytes = topic.encode("utf-8")
+    message_bytes = json.dumps(message).encode("utf-8")
+    return len(topic_bytes).to_bytes(2, "big") + topic_bytes + message_bytes
+
+
+class TestPublishHandlerAdaptiveSerial:
+    """#927: `_handle_publish` must accept any `device/*/request` topic from an
+    authenticated client and use the topic's serial for all responses."""
+
+    def test_handle_publish_accepts_mismatched_serial(self):
+        """Prior behavior silently dropped publishes whose topic serial didn't
+        equal self.serial. After the fix the handler must run and learn the
+        client's serial.
+        """
+        server = _make_server(serial="01P00A391800001")  # synthetic VP serial
+        server._client_serials["test-client"] = server.serial  # simulate post-CONNECT
+
+        writer = MagicMock()
+        writer.write = MagicMock()
+        writer.drain = AsyncMock()
+
+        # Slicer publishes with a *different* serial — the exact bug from #927.
+        topic = "device/01P00AABCDEFGHI/request"
+        payload = _build_publish_payload(topic, {"info": {"command": "get_version", "sequence_id": "42"}})
+
+        asyncio.run(server._handle_publish(0x30, payload, writer, "test-client"))
+
+        # Learned the client's serial.
+        assert server._client_serials["test-client"] == "01P00AABCDEFGHI"
+
+        # Wrote at least one packet to the slicer (the version response).
+        assert writer.write.called
+        all_bytes = b"".join(call.args[0] for call in writer.write.call_args_list)
+        # Response topic must contain the *client's* serial, not self.serial.
+        assert b"device/01P00AABCDEFGHI/report" in all_bytes
+        assert b"device/01P00A391800001/report" not in all_bytes
+        # Response body carries get_version with the client's serial as sn.
+        assert b'"command": "get_version"' in all_bytes
+        assert b'"sn": "01P00AABCDEFGHI"' in all_bytes
+
+    def test_handle_publish_ignores_non_request_topics(self):
+        server = _make_server()
+        server._client_serials["c1"] = server.serial
+        writer = MagicMock()
+        writer.write = MagicMock()
+        writer.drain = AsyncMock()
+
+        payload = _build_publish_payload(
+            "device/01P00AABCDEFGHI/report",  # /report, not /request
+            {"pushing": {"command": "pushall"}},
+        )
+        asyncio.run(server._handle_publish(0x30, payload, writer, "c1"))
+
+        assert not writer.write.called  # no response
+        # Client serial unchanged
+        assert server._client_serials["c1"] == server.serial
+
+    def test_handle_publish_pushall_uses_client_serial(self):
+        """pushall → status_report must be sent on the client's subscribed topic."""
+        server = _make_server(serial="01P00A391800001")
+        server._client_serials["c1"] = server.serial
+
+        writer = MagicMock()
+        writer.write = MagicMock()
+        writer.drain = AsyncMock()
+
+        payload = _build_publish_payload(
+            "device/CUSTOMSERIAL123/request",
+            {"pushing": {"command": "pushall", "sequence_id": "1"}},
+        )
+        asyncio.run(server._handle_publish(0x30, payload, writer, "c1"))
+
+        all_bytes = b"".join(call.args[0] for call in writer.write.call_args_list)
+        assert b"device/CUSTOMSERIAL123/report" in all_bytes
+        assert b'"command": "push_status"' in all_bytes
+        assert server._client_serials["c1"] == "CUSTOMSERIAL123"
+
+
+class TestClientSerialLifecycle:
+    """_client_serials must be cleaned up on disconnect/stop to avoid leaks."""
+
+    def test_stop_clears_client_serials(self):
+        server = _make_server()
+        server._client_serials["a"] = "X"
+        server._client_serials["b"] = "Y"
+        # stop() is async but we only need to cover the clear() path; run a minimal version
+        asyncio.run(server.stop())
+        assert server._client_serials == {}

Some files were not shown because too many files changed in this diff