|
|
@@ -201,6 +201,14 @@ class SimpleMQTTServer:
|
|
|
self._running = False
|
|
|
self._server = None
|
|
|
self._clients: dict[str, asyncio.StreamWriter] = {}
|
|
|
+ # Per-client "effective serial" — the serial the slicer actually uses in
|
|
|
+ # device/{serial}/report|request topics. Populated from the first
|
|
|
+ # SUBSCRIBE/PUBLISH we see on a connection. This lets the VP respond on
|
|
|
+ # the topic the slicer is listening on even when it disagrees with
|
|
|
+ # self.serial (e.g. a stale Orca config that was bound to an older VP
|
|
|
+ # serial, or a printer entry that was re-pointed at the VP IP without
|
|
|
+ # updating the serial).
|
|
|
+ self._client_serials: dict[str, str] = {}
|
|
|
self._status_push_task: asyncio.Task | None = None
|
|
|
self._sequence_id = 0
|
|
|
|
|
|
@@ -311,6 +319,7 @@ class SimpleMQTTServer:
|
|
|
except OSError:
|
|
|
pass # Best-effort client connection cleanup; client may have disconnected
|
|
|
self._clients.clear()
|
|
|
+ self._client_serials.clear()
|
|
|
|
|
|
if self._server:
|
|
|
try:
|
|
|
@@ -320,6 +329,22 @@ class SimpleMQTTServer:
|
|
|
pass # Best-effort server shutdown; port may already be released
|
|
|
self._server = None
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def _extract_serial_from_topic(topic: str) -> str | None:
|
|
|
+ """Pull the serial out of a `device/{serial}/report|request` topic.
|
|
|
+
|
|
|
+ Returns None if the topic doesn't match that shape — callers fall back
|
|
|
+ to self.serial in that case.
|
|
|
+ """
|
|
|
+ if not topic.startswith("device/"):
|
|
|
+ return None
|
|
|
+ rest = topic[len("device/") :]
|
|
|
+ # Expect "{serial}/report" or "{serial}/request" (possibly with suffixes).
|
|
|
+ slash = rest.find("/")
|
|
|
+ if slash <= 0:
|
|
|
+ return None
|
|
|
+ return rest[:slash]
|
|
|
+
|
|
|
async def _periodic_status_push(self) -> None:
|
|
|
"""Send periodic status updates to all connected clients."""
|
|
|
logger.info("Starting periodic status push task")
|
|
|
@@ -334,7 +359,8 @@ class SimpleMQTTServer:
|
|
|
if writer.is_closing():
|
|
|
disconnected.append(client_id)
|
|
|
continue
|
|
|
- await self._send_status_report(writer)
|
|
|
+ serial = self._client_serials.get(client_id, self.serial)
|
|
|
+ await self._send_status_report(writer, serial=serial)
|
|
|
except OSError as e:
|
|
|
logger.debug("Failed to push status to %s: %s", client_id, e)
|
|
|
disconnected.append(client_id)
|
|
|
@@ -342,6 +368,7 @@ class SimpleMQTTServer:
|
|
|
# Remove disconnected clients
|
|
|
for client_id in disconnected:
|
|
|
self._clients.pop(client_id, None)
|
|
|
+ self._client_serials.pop(client_id, None)
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
break
|
|
|
@@ -384,14 +411,17 @@ class SimpleMQTTServer:
|
|
|
authenticated = await self._handle_connect(payload, writer)
|
|
|
if not authenticated:
|
|
|
break
|
|
|
- # Register client for periodic status pushes
|
|
|
+ # Register client for periodic status pushes; start with
|
|
|
+ # self.serial as the fallback until we learn the slicer's
|
|
|
+ # preferred serial from the first SUBSCRIBE/PUBLISH.
|
|
|
self._clients[client_id] = writer
|
|
|
+ self._client_serials[client_id] = self.serial
|
|
|
elif packet_type == 3: # PUBLISH
|
|
|
if authenticated:
|
|
|
- await self._handle_publish(header[0], payload, writer)
|
|
|
+ await self._handle_publish(header[0], payload, writer, client_id)
|
|
|
elif packet_type == 8: # SUBSCRIBE
|
|
|
if authenticated:
|
|
|
- await self._handle_subscribe(payload, writer)
|
|
|
+ await self._handle_subscribe(payload, writer, client_id)
|
|
|
elif packet_type == 12: # PINGREQ
|
|
|
# Send PINGRESP
|
|
|
writer.write(bytes([0xD0, 0x00]))
|
|
|
@@ -405,8 +435,8 @@ class SimpleMQTTServer:
|
|
|
logger.debug("MQTT client error: %s", e)
|
|
|
finally:
|
|
|
logger.debug("MQTT client disconnected: %s", client_id)
|
|
|
- if client_id in self._clients:
|
|
|
- del self._clients[client_id]
|
|
|
+ self._clients.pop(client_id, None)
|
|
|
+ self._client_serials.pop(client_id, None)
|
|
|
try:
|
|
|
writer.close()
|
|
|
await writer.wait_closed()
|
|
|
@@ -493,7 +523,7 @@ class SimpleMQTTServer:
|
|
|
await writer.drain()
|
|
|
return False
|
|
|
|
|
|
- async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter) -> None:
|
|
|
+ async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
|
|
|
"""Handle MQTT SUBSCRIBE packet."""
|
|
|
try:
|
|
|
# Parse packet ID
|
|
|
@@ -502,6 +532,7 @@ class SimpleMQTTServer:
|
|
|
# Parse topic filters (just acknowledge them)
|
|
|
idx = 2
|
|
|
granted_qos = []
|
|
|
+ learned_serial: str | None = None
|
|
|
while idx < len(payload):
|
|
|
topic_len = (payload[idx] << 8) | payload[idx + 1]
|
|
|
idx += 2
|
|
|
@@ -513,19 +544,36 @@ class SimpleMQTTServer:
|
|
|
logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
|
|
|
granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
|
|
|
|
|
|
+ # Remember the serial the slicer is listening on so status/version
|
|
|
+ # responses go to a topic it actually subscribed to.
|
|
|
+ if learned_serial is None:
|
|
|
+ extracted = self._extract_serial_from_topic(topic)
|
|
|
+ if extracted:
|
|
|
+ learned_serial = extracted
|
|
|
+
|
|
|
+ if learned_serial and learned_serial != self._client_serials.get(client_id):
|
|
|
+ if learned_serial != self.serial:
|
|
|
+ logger.info(
|
|
|
+ "%sMQTT client subscribed with serial %s (VP serial is %s) — adapting responses",
|
|
|
+ self._log_prefix,
|
|
|
+ learned_serial,
|
|
|
+ self.serial,
|
|
|
+ )
|
|
|
+ self._client_serials[client_id] = learned_serial
|
|
|
+
|
|
|
# Send SUBACK
|
|
|
suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
|
|
|
suback += bytes(granted_qos)
|
|
|
writer.write(suback)
|
|
|
await writer.drain()
|
|
|
|
|
|
- # Send initial status report after subscribe
|
|
|
- await self._send_status_report(writer)
|
|
|
+ # Send initial status report after subscribe on the client's subscribed topic
|
|
|
+ await self._send_status_report(writer, serial=self._client_serials.get(client_id, self.serial))
|
|
|
|
|
|
except (IndexError, ValueError, OSError) as e:
|
|
|
logger.debug("MQTT SUBSCRIBE error: %s", e)
|
|
|
|
|
|
- async def _send_status_report(self, writer: asyncio.StreamWriter) -> None:
|
|
|
+ async def _send_status_report(self, writer: asyncio.StreamWriter, serial: str | None = None) -> None:
|
|
|
"""Send a status report to the slicer after connection."""
|
|
|
try:
|
|
|
# Build status message matching Bambu printer format
|
|
|
@@ -603,16 +651,21 @@ class SimpleMQTTServer:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- await self._publish_to_report(writer, status, self.serial)
|
|
|
+ await self._publish_to_report(writer, status, serial or self.serial)
|
|
|
|
|
|
except OSError as e:
|
|
|
logger.error("Failed to send status report: %s", e)
|
|
|
|
|
|
- async def _send_version_response(self, writer: asyncio.StreamWriter, sequence_id: str) -> None:
|
|
|
+ async def _send_version_response(
|
|
|
+ self, writer: asyncio.StreamWriter, sequence_id: str, serial: str | None = None
|
|
|
+ ) -> None:
|
|
|
"""Send version info response to the slicer."""
|
|
|
try:
|
|
|
product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
|
|
|
- serial = self.serial
|
|
|
+ # The serial is embedded inside the module[].sn fields *and* used as
|
|
|
+ # the report topic. Use the client's effective serial so the slicer
|
|
|
+ # sees internal/topic consistency even when it differs from self.serial.
|
|
|
+ serial = serial or self.serial
|
|
|
|
|
|
# Build version response matching OrcaSlicer expectations
|
|
|
# Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
|
|
|
@@ -715,7 +768,9 @@ class SimpleMQTTServer:
|
|
|
except TimeoutError:
|
|
|
logger.debug("MQTT drain timeout for %s — client may be busy", topic)
|
|
|
|
|
|
- async def _send_print_response(self, writer: asyncio.StreamWriter, sequence_id: str, filename: str) -> None:
|
|
|
+ async def _send_print_response(
|
|
|
+ self, writer: asyncio.StreamWriter, sequence_id: str, filename: str, serial: str | None = None
|
|
|
+ ) -> None:
|
|
|
"""Send project_file acknowledgment matching real Bambu printer behavior."""
|
|
|
# Update state so periodic status pushes reflect preparation
|
|
|
self._gcode_state = "PREPARE"
|
|
|
@@ -739,12 +794,12 @@ class SimpleMQTTServer:
|
|
|
"msg": 0,
|
|
|
}
|
|
|
}
|
|
|
- await self._publish_to_report(writer, response)
|
|
|
+ await self._publish_to_report(writer, response, serial or self.serial)
|
|
|
logger.info("Sent project_file acknowledgment for %s", filename)
|
|
|
except OSError as e:
|
|
|
logger.error("Failed to send print response: %s", e)
|
|
|
|
|
|
- async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter) -> None:
|
|
|
+ async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
|
|
|
"""Handle MQTT PUBLISH packet."""
|
|
|
try:
|
|
|
# Parse topic
|
|
|
@@ -765,55 +820,73 @@ class SimpleMQTTServer:
|
|
|
|
|
|
logger.info("MQTT publish to %s: %s...", topic, message[:100])
|
|
|
|
|
|
- # Handle commands on device request topic
|
|
|
- if f"device/{self.serial}/request" in topic:
|
|
|
- try:
|
|
|
- data = json.loads(message)
|
|
|
-
|
|
|
- # Handle pushing command (status request)
|
|
|
- if "pushing" in data:
|
|
|
- pushing_data = data["pushing"]
|
|
|
- command = pushing_data.get("command", "")
|
|
|
- logger.info("MQTT pushing command: %s", command)
|
|
|
-
|
|
|
- if command == "pushall":
|
|
|
- # Slicer is requesting full status - send response
|
|
|
- logger.info("Sending status report in response to pushall")
|
|
|
- await self._send_status_report(writer)
|
|
|
- elif command == "start":
|
|
|
- # Slicer wants periodic status updates - send one now
|
|
|
- logger.info("Starting status push stream")
|
|
|
- await self._send_status_report(writer)
|
|
|
-
|
|
|
- # Handle info commands (get_version, etc.)
|
|
|
- if "info" in data:
|
|
|
- info_data = data["info"]
|
|
|
- command = info_data.get("command", "")
|
|
|
- sequence_id = info_data.get("sequence_id", "0")
|
|
|
- logger.info("MQTT info command: %s", command)
|
|
|
-
|
|
|
- if command == "get_version":
|
|
|
- await self._send_version_response(writer, sequence_id)
|
|
|
-
|
|
|
- # Handle print commands
|
|
|
- if "print" in data:
|
|
|
- print_data = data["print"]
|
|
|
- command = print_data.get("command", "")
|
|
|
- filename = print_data.get("subtask_name", "")
|
|
|
- sequence_id = print_data.get("sequence_id", "0")
|
|
|
-
|
|
|
- logger.info("MQTT print command: %s for %s", command, filename)
|
|
|
-
|
|
|
- if command == "project_file":
|
|
|
- # Respond with PREPARE status so slicer proceeds with FTP upload
|
|
|
- file_3mf = print_data.get("file", filename)
|
|
|
- await self._send_print_response(writer, sequence_id, file_3mf)
|
|
|
-
|
|
|
- if self.on_print_command:
|
|
|
- await self._notify_print_command(filename, print_data)
|
|
|
-
|
|
|
- except json.JSONDecodeError:
|
|
|
- pass # Non-JSON payloads on request topic are safely ignored
|
|
|
+ # Only handle publishes on *some* device/.../request topic. The
|
|
|
+ # serial is taken from the topic rather than compared against
|
|
|
+ # self.serial: the client is already authenticated via the access
|
|
|
+ # code, and Orca/BambuStudio may have a cached serial that differs
|
|
|
+ # from the VP's computed self.serial (#927). Use the topic's serial
|
|
|
+ # for all responses so they land on the topic the slicer subscribed
|
|
|
+ # to.
|
|
|
+ if not topic.startswith("device/") or "/request" not in topic:
|
|
|
+ return
|
|
|
+
|
|
|
+ client_serial = self._extract_serial_from_topic(topic) or self.serial
|
|
|
+ if client_serial and client_serial != self._client_serials.get(client_id):
|
|
|
+ if client_serial != self.serial:
|
|
|
+ logger.info(
|
|
|
+ "%sMQTT client publishing with serial %s (VP serial is %s) — adapting responses",
|
|
|
+ self._log_prefix,
|
|
|
+ client_serial,
|
|
|
+ self.serial,
|
|
|
+ )
|
|
|
+ self._client_serials[client_id] = client_serial
|
|
|
+
|
|
|
+ try:
|
|
|
+ data = json.loads(message)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ return # Non-JSON payloads on request topic are safely ignored
|
|
|
+
|
|
|
+ # Handle pushing command (status request)
|
|
|
+ if "pushing" in data:
|
|
|
+ pushing_data = data["pushing"]
|
|
|
+ command = pushing_data.get("command", "")
|
|
|
+ logger.info("MQTT pushing command: %s", command)
|
|
|
+
|
|
|
+ if command == "pushall":
|
|
|
+ # Slicer is requesting full status - send response
|
|
|
+ logger.info("Sending status report in response to pushall")
|
|
|
+ await self._send_status_report(writer, serial=client_serial)
|
|
|
+ elif command == "start":
|
|
|
+ # Slicer wants periodic status updates - send one now
|
|
|
+ logger.info("Starting status push stream")
|
|
|
+ await self._send_status_report(writer, serial=client_serial)
|
|
|
+
|
|
|
+ # Handle info commands (get_version, etc.)
|
|
|
+ if "info" in data:
|
|
|
+ info_data = data["info"]
|
|
|
+ command = info_data.get("command", "")
|
|
|
+ sequence_id = info_data.get("sequence_id", "0")
|
|
|
+ logger.info("MQTT info command: %s", command)
|
|
|
+
|
|
|
+ if command == "get_version":
|
|
|
+ await self._send_version_response(writer, sequence_id, serial=client_serial)
|
|
|
+
|
|
|
+ # Handle print commands
|
|
|
+ if "print" in data:
|
|
|
+ print_data = data["print"]
|
|
|
+ command = print_data.get("command", "")
|
|
|
+ filename = print_data.get("subtask_name", "")
|
|
|
+ sequence_id = print_data.get("sequence_id", "0")
|
|
|
+
|
|
|
+ logger.info("MQTT print command: %s for %s", command, filename)
|
|
|
+
|
|
|
+ if command == "project_file":
|
|
|
+ # Respond with PREPARE status so slicer proceeds with FTP upload
|
|
|
+ file_3mf = print_data.get("file", filename)
|
|
|
+ await self._send_print_response(writer, sequence_id, file_3mf, serial=client_serial)
|
|
|
+
|
|
|
+ if self.on_print_command:
|
|
|
+ await self._notify_print_command(filename, print_data)
|
|
|
|
|
|
except (IndexError, ValueError, OSError) as e:
|
|
|
logger.debug("MQTT PUBLISH error: %s", e)
|