mqtt_server.py 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063
  1. """MQTT broker for virtual printer.
  2. Implements an MQTT broker that accepts connections from slicers,
  3. authenticates with the configured access code, and logs print commands.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. from collections.abc import Callable
  10. from pathlib import Path
  11. from typing import TYPE_CHECKING
  12. if TYPE_CHECKING:
  13. from backend.app.services.virtual_printer.mqtt_bridge import MQTTBridge
  14. logger = logging.getLogger(__name__)
  15. # Default MQTT port for Bambu printers (MQTT over TLS)
  16. MQTT_PORT = 8883
  17. # Model code → product_name for version response (must match what slicer expects)
  18. MODEL_PRODUCT_NAMES = {
  19. "BL-P001": "X1 Carbon",
  20. "BL-P002": "X1",
  21. "C13": "X1E",
  22. "N6": "X2D",
  23. "C11": "P1P",
  24. "C12": "P1S",
  25. "N7": "P2S",
  26. "N2S": "A1",
  27. "N1": "A1 mini",
  28. "O1D": "H2D",
  29. "O1C": "H2C",
  30. "O1C2": "H2C",
  31. "O1S": "H2S",
  32. }
  33. class VirtualPrinterMQTTServer:
  34. """MQTT broker that accepts connections from slicers.
  35. This is a minimal MQTT broker implementation that:
  36. - Accepts TLS connections on port 8883
  37. - Authenticates with username 'bblp' and the configured access code
  38. - Receives print commands on device/{serial}/request
  39. - Can publish status on device/{serial}/report
  40. """
  41. def __init__(
  42. self,
  43. serial: str,
  44. access_code: str,
  45. cert_path: Path,
  46. key_path: Path,
  47. port: int = MQTT_PORT,
  48. on_print_command: Callable[[str, dict], None] | None = None,
  49. ):
  50. """Initialize the MQTT server.
  51. Args:
  52. serial: Virtual printer serial number
  53. access_code: Password for authentication
  54. cert_path: Path to TLS certificate
  55. key_path: Path to TLS private key
  56. port: Port to listen on (default 8883)
  57. on_print_command: Callback when print command received (filename, data)
  58. """
  59. self.serial = serial
  60. self.access_code = access_code
  61. self.cert_path = cert_path
  62. self.key_path = key_path
  63. self.port = port
  64. self.on_print_command = on_print_command
  65. self._running = False
  66. self._broker = None
  67. self._broker_task = None
  68. async def start(self) -> None:
  69. """Start the MQTT broker."""
  70. if self._running:
  71. return
  72. # Try to import amqtt
  73. try:
  74. from amqtt.broker import Broker
  75. except ImportError:
  76. logger.error("amqtt not installed. Run: pip install amqtt")
  77. return
  78. logger.info("Starting virtual printer MQTT broker on port %s", self.port)
  79. # Build broker configuration
  80. config = {
  81. "listeners": {
  82. "default": {
  83. "type": "tcp",
  84. "bind": f"0.0.0.0:{self.port}",
  85. "ssl": "on",
  86. "certfile": str(self.cert_path),
  87. "keyfile": str(self.key_path),
  88. },
  89. },
  90. "auth": {
  91. "allow-anonymous": False,
  92. "plugins": ["auth_custom"],
  93. },
  94. "topic-check": {
  95. "enabled": False, # Allow any topic
  96. },
  97. }
  98. try:
  99. self._running = True
  100. # Create and start broker
  101. self._broker = Broker(config)
  102. # Register custom auth plugin
  103. self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate
  104. # Start the broker
  105. await self._broker.start()
  106. logger.info("MQTT broker started on port %s", self.port)
  107. # Keep running
  108. while self._running:
  109. await asyncio.sleep(1)
  110. except OSError as e:
  111. if e.errno == 98: # Address already in use
  112. logger.error("MQTT port %s is already in use", self.port)
  113. else:
  114. logger.error("MQTT broker error: %s", e)
  115. except asyncio.CancelledError:
  116. logger.debug("MQTT broker task cancelled")
  117. except Exception as e:
  118. logger.error("MQTT broker error: %s", e)
  119. finally:
  120. await self.stop()
  121. async def _authenticate(self, session) -> bool:
  122. """Authenticate MQTT connection.
  123. Args:
  124. session: MQTT session with username/password
  125. Returns:
  126. True if authentication successful
  127. """
  128. username = getattr(session, "username", None)
  129. password = getattr(session, "password", None)
  130. # Bambu slicers use 'bblp' as username and access code as password
  131. if username == "bblp" and password == self.access_code:
  132. logger.debug("MQTT client authenticated from %s", session.remote_address)
  133. return True
  134. logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address)
  135. return False
  136. async def stop(self) -> None:
  137. """Stop the MQTT broker."""
  138. logger.info("Stopping MQTT broker")
  139. self._running = False
  140. if self._broker:
  141. try:
  142. await self._broker.shutdown()
  143. except OSError as e:
  144. logger.debug("Error shutting down MQTT broker: %s", e)
  145. self._broker = None
  146. class SimpleMQTTServer:
  147. """Simplified MQTT server using raw sockets.
  148. This is a fallback implementation that handles basic MQTT protocol
  149. without requiring the amqtt library. It's less feature-complete but
  150. more lightweight.
  151. """
  152. def __init__(
  153. self,
  154. serial: str,
  155. access_code: str,
  156. cert_path: Path,
  157. key_path: Path,
  158. port: int = MQTT_PORT,
  159. on_print_command: Callable[[str, dict], None] | None = None,
  160. model: str = "",
  161. bind_address: str = "0.0.0.0", # nosec B104
  162. vp_name: str = "",
  163. ):
  164. self.serial = serial
  165. self.access_code = access_code
  166. self.model = model
  167. self.cert_path = cert_path
  168. self.key_path = key_path
  169. self.port = port
  170. self.on_print_command = on_print_command
  171. self.bind_address = bind_address
  172. self.vp_name = vp_name
  173. self._log_prefix = f"[{vp_name}] " if vp_name else ""
  174. self._running = False
  175. self._server = None
  176. self._clients: dict[str, asyncio.StreamWriter] = {}
  177. # Per-client "effective serial" — the serial the slicer actually uses in
  178. # device/{serial}/report|request topics. Populated from the first
  179. # SUBSCRIBE/PUBLISH we see on a connection. This lets the VP respond on
  180. # the topic the slicer is listening on even when it disagrees with
  181. # self.serial (e.g. a stale Orca config that was bound to an older VP
  182. # serial, or a printer entry that was re-pointed at the VP IP without
  183. # updating the serial).
  184. self._client_serials: dict[str, str] = {}
  185. self._status_push_task: asyncio.Task | None = None
  186. self._sequence_id = 0
  187. # Dynamic state for status reports
  188. self._gcode_state = "IDLE"
  189. self._current_file = ""
  190. self._prepare_percent = "0"
  191. # MQTT bridge for non-proxy modes — set by VirtualPrinterInstance after start().
  192. # When the bridge is_active, real printer pushes are fanned out to slicers and
  193. # the synthetic 1s push is suspended. When the target printer goes offline the
  194. # synthetic fallback resumes automatically.
  195. self._bridge: MQTTBridge | None = None
  196. async def start(self) -> None:
  197. """Start the MQTT server."""
  198. if self._running:
  199. return
  200. logger.info("Starting simple MQTT server on port %s", self.port)
  201. # Create SSL context with Bambu-compatible settings
  202. ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  203. ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
  204. # Match Bambu printer behavior - accept any client
  205. ssl_context.verify_mode = ssl.CERT_NONE
  206. # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3)
  207. ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
  208. # Disable hostname checking
  209. ssl_context.check_hostname = False
  210. # Log certificate info
  211. import subprocess
  212. try:
  213. result = subprocess.run(
  214. ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"],
  215. capture_output=True,
  216. text=True,
  217. timeout=5,
  218. )
  219. logger.info("MQTT SSL cert info: %s", result.stdout.strip())
  220. except (OSError, subprocess.SubprocessError):
  221. pass # Certificate info is for debug logging only; not critical
  222. logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path)
  223. try:
  224. self._running = True
  225. # Wrapper to log ALL connection attempts including SSL errors
  226. async def connection_handler(reader, writer):
  227. try:
  228. addr = writer.get_extra_info("peername")
  229. ssl_obj = writer.get_extra_info("ssl_object")
  230. if ssl_obj:
  231. logger.info(
  232. f"{self._log_prefix}MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}"
  233. )
  234. else:
  235. logger.info("%sMQTT connection from %s (no TLS?)", self._log_prefix, addr)
  236. await self._handle_client(reader, writer)
  237. except ssl.SSLError as e:
  238. logger.error("MQTT SSL error: %s", e)
  239. except Exception as e:
  240. logger.error("MQTT connection handler error: %s", e)
  241. self._server = await asyncio.start_server(
  242. connection_handler,
  243. self.bind_address,
  244. self.port,
  245. ssl=ssl_context,
  246. )
  247. logger.info("Simple MQTT server listening on port %s", self.port)
  248. # Start periodic status push task
  249. self._status_push_task = asyncio.create_task(self._periodic_status_push())
  250. async with self._server:
  251. await self._server.serve_forever()
  252. except OSError as e:
  253. if e.errno == 98: # Address already in use
  254. logger.error("MQTT port %s is already in use", self.port)
  255. else:
  256. logger.error("MQTT server error: %s", e)
  257. except asyncio.CancelledError:
  258. logger.debug("MQTT server task cancelled")
  259. except Exception as e:
  260. logger.error("MQTT server error: %s", e)
  261. finally:
  262. await self.stop()
  263. async def stop(self) -> None:
  264. """Stop the MQTT server."""
  265. logger.info("Stopping simple MQTT server")
  266. self._running = False
  267. # Stop periodic status push
  268. if self._status_push_task:
  269. self._status_push_task.cancel()
  270. try:
  271. await self._status_push_task
  272. except asyncio.CancelledError:
  273. pass # Expected when stopping the periodic status push task
  274. self._status_push_task = None
  275. # Close all client connections (iterate over copy to avoid modification during iteration)
  276. for _client_id, writer in list(self._clients.items()):
  277. try:
  278. writer.close()
  279. await writer.wait_closed()
  280. except OSError:
  281. pass # Best-effort client connection cleanup; client may have disconnected
  282. self._clients.clear()
  283. self._client_serials.clear()
  284. if self._server:
  285. try:
  286. self._server.close()
  287. await self._server.wait_closed()
  288. except OSError:
  289. pass # Best-effort server shutdown; port may already be released
  290. self._server = None
  291. @staticmethod
  292. def _extract_serial_from_topic(topic: str) -> str | None:
  293. """Pull the serial out of a `device/{serial}/report|request` topic.
  294. Returns None if the topic doesn't match that shape — callers fall back
  295. to self.serial in that case.
  296. """
  297. if not topic.startswith("device/"):
  298. return None
  299. rest = topic[len("device/") :]
  300. # Expect "{serial}/report" or "{serial}/request" (possibly with suffixes).
  301. slash = rest.find("/")
  302. if slash <= 0:
  303. return None
  304. return rest[:slash]
  305. def set_bridge(self, bridge: "MQTTBridge | None") -> None:
  306. """Attach (or detach) the MQTT bridge that mirrors the target printer."""
  307. self._bridge = bridge
  308. async def _periodic_status_push(self) -> None:
  309. """Send periodic status updates to all connected clients (1 Hz, exact pre-bridge behaviour)."""
  310. logger.info("Starting periodic status push task")
  311. while self._running:
  312. try:
  313. await asyncio.sleep(1) # Push every 1 second like real printers
  314. disconnected = []
  315. for client_id, writer in list(self._clients.items()):
  316. try:
  317. if writer.is_closing():
  318. disconnected.append(client_id)
  319. continue
  320. serial = self._client_serials.get(client_id, self.serial)
  321. await self._send_status_report(writer, serial=serial)
  322. except OSError as e:
  323. logger.debug("Failed to push status to %s: %s", client_id, e)
  324. disconnected.append(client_id)
  325. # Remove disconnected clients
  326. for client_id in disconnected:
  327. self._clients.pop(client_id, None)
  328. self._client_serials.pop(client_id, None)
  329. except asyncio.CancelledError:
  330. break
  331. except Exception as e:
  332. logger.error("Periodic status push error: %s", e)
  333. logger.info("Periodic status push task stopped")
  334. async def push_raw_to_clients(self, topic: str, payload: bytes) -> None:
  335. """Publish a pre-serialized MQTT payload on `topic` to every connected slicer.
  336. Called by MQTTBridge from the asyncio loop (scheduled via
  337. run_coroutine_threadsafe from paho's network thread).
  338. """
  339. topic_bytes = topic.encode("utf-8")
  340. # MQTT remaining-length: 2-byte topic length prefix + topic + message body.
  341. remaining = 2 + len(topic_bytes) + len(payload)
  342. packet = bytearray([0x30]) # PUBLISH, QoS 0
  343. while True:
  344. byte = remaining % 128
  345. remaining //= 128
  346. if remaining > 0:
  347. byte |= 0x80
  348. packet.append(byte)
  349. if remaining == 0:
  350. break
  351. packet.extend([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  352. packet.extend(topic_bytes)
  353. packet.extend(payload)
  354. frame = bytes(packet)
  355. disconnected = []
  356. for client_id, writer in list(self._clients.items()):
  357. try:
  358. if writer.is_closing():
  359. disconnected.append(client_id)
  360. continue
  361. writer.write(frame)
  362. try:
  363. await asyncio.wait_for(writer.drain(), timeout=5)
  364. except TimeoutError:
  365. logger.debug("MQTT drain timeout pushing bridge frame to %s", client_id)
  366. except OSError as e:
  367. logger.debug("Failed to push bridge frame to %s: %s", client_id, e)
  368. disconnected.append(client_id)
  369. for client_id in disconnected:
  370. self._clients.pop(client_id, None)
  371. self._client_serials.pop(client_id, None)
  372. async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
  373. """Handle an MQTT client connection."""
  374. addr = writer.get_extra_info("peername")
  375. client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
  376. logger.info("%sMQTT client connected: %s", self._log_prefix, client_id)
  377. authenticated = False
  378. # Per-packet read timeout. Before CONNECT we default to 60 s so a
  379. # client that opens TCP but never sends anything still gets reaped;
  380. # after CONNECT the value is updated to 1.5× the keepalive the
  381. # client negotiated (MQTT spec §4.4). ``None`` means no timeout,
  382. # which is what spec §3.1.2.10 mandates for keep_alive == 0.
  383. read_timeout: float | None = 60.0
  384. try:
  385. while self._running:
  386. # Read MQTT fixed header
  387. try:
  388. header = await asyncio.wait_for(reader.read(1), timeout=read_timeout)
  389. except TimeoutError:
  390. break
  391. if not header:
  392. break
  393. packet_type = (header[0] & 0xF0) >> 4
  394. # Read remaining length
  395. remaining_length = await self._read_remaining_length(reader)
  396. if remaining_length is None:
  397. break
  398. # Read payload
  399. payload = await reader.read(remaining_length) if remaining_length > 0 else b""
  400. # Handle packet types
  401. if packet_type == 1: # CONNECT
  402. authenticated, keep_alive = await self._handle_connect(payload, writer)
  403. if not authenticated:
  404. break
  405. # Honour the client's negotiated keepalive (#1548). Before
  406. # this fix, the hardcoded 60 s above would close
  407. # OrcaSlicer's idle connection at the keepalive boundary
  408. # instead of waiting 1.5× as the spec requires — Orca
  409. # sends PINGREQ within its own keepalive interval but
  410. # we'd already have closed the socket.
  411. read_timeout = keep_alive * 1.5 if keep_alive > 0 else None
  412. # Register client for periodic status pushes; start with
  413. # self.serial as the fallback until we learn the slicer's
  414. # preferred serial from the first SUBSCRIBE/PUBLISH.
  415. self._clients[client_id] = writer
  416. self._client_serials[client_id] = self.serial
  417. elif packet_type == 3: # PUBLISH
  418. if authenticated:
  419. await self._handle_publish(header[0], payload, writer, client_id)
  420. elif packet_type == 8: # SUBSCRIBE
  421. if authenticated:
  422. await self._handle_subscribe(payload, writer, client_id)
  423. elif packet_type == 12: # PINGREQ
  424. # Send PINGRESP
  425. writer.write(bytes([0xD0, 0x00]))
  426. await writer.drain()
  427. elif packet_type == 14: # DISCONNECT
  428. break
  429. except asyncio.CancelledError:
  430. pass # Expected when server is shutting down and cancels client tasks
  431. except Exception as e:
  432. logger.debug("MQTT client error: %s", e)
  433. finally:
  434. logger.debug("MQTT client disconnected: %s", client_id)
  435. self._clients.pop(client_id, None)
  436. self._client_serials.pop(client_id, None)
  437. try:
  438. writer.close()
  439. await writer.wait_closed()
  440. except OSError:
  441. pass # Best-effort socket cleanup on client disconnect
  442. async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None:
  443. """Read MQTT remaining length (variable byte integer)."""
  444. multiplier = 1
  445. value = 0
  446. for _ in range(4):
  447. try:
  448. byte = await reader.read(1)
  449. if not byte:
  450. return None
  451. encoded = byte[0]
  452. value += (encoded & 127) * multiplier
  453. if (encoded & 128) == 0:
  454. return value
  455. multiplier *= 128
  456. except OSError:
  457. return None
  458. return None
  459. async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> tuple[bool, int]:
  460. """Handle MQTT CONNECT packet.
  461. Returns ``(authenticated, keep_alive_seconds)`` — the second element
  462. is the value the client advertised in CONNECT, so the caller's
  463. read-loop can honour it instead of the hardcoded default. ``0``
  464. means the client opted out of keepalive (#1548).
  465. """
  466. try:
  467. # Parse CONNECT packet
  468. # Skip protocol name length and name
  469. idx = 0
  470. proto_len = (payload[idx] << 8) | payload[idx + 1]
  471. idx += 2 + proto_len
  472. # Skip protocol level and connect flags
  473. # connect_flags = payload[idx + 1]
  474. idx += 2
  475. # Keepalive (2-byte big-endian, seconds). Honoured by the read
  476. # loop in `_handle_client` per MQTT spec §3.1.2.10 / §4.4 —
  477. # before #1548 we ignored this and used a hardcoded 60 s, which
  478. # closed OrcaSlicer's idle connection at exactly the negotiated
  479. # keepalive boundary instead of the spec-mandated 1.5×.
  480. keep_alive = (payload[idx] << 8) | payload[idx + 1]
  481. idx += 2
  482. # Read client ID
  483. client_id_len = (payload[idx] << 8) | payload[idx + 1]
  484. idx += 2
  485. # client_id = payload[idx : idx + client_id_len].decode("utf-8")
  486. idx += client_id_len
  487. # Read username
  488. username_len = (payload[idx] << 8) | payload[idx + 1]
  489. idx += 2
  490. username = payload[idx : idx + username_len].decode("utf-8")
  491. idx += username_len
  492. # Read password
  493. password_len = (payload[idx] << 8) | payload[idx + 1]
  494. idx += 2
  495. password = payload[idx : idx + password_len].decode("utf-8")
  496. # Authenticate
  497. if username == "bblp" and password == self.access_code:
  498. # Send CONNACK with success
  499. writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
  500. await writer.drain()
  501. logger.info("%sMQTT client authenticated successfully", self._log_prefix)
  502. # Send immediate status report after auth - slicer expects this
  503. await self._send_status_report(writer)
  504. return True, keep_alive
  505. else:
  506. # Send CONNACK with auth failure
  507. writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
  508. await writer.drain()
  509. logger.warning("%sMQTT auth failed for user '%s' (access code mismatch)", self._log_prefix, username)
  510. return False, 0
  511. except (IndexError, ValueError) as e:
  512. logger.debug("MQTT CONNECT parse error: %s", e)
  513. # Send CONNACK with error
  514. writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error
  515. await writer.drain()
  516. return False, 0
  517. async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
  518. """Handle MQTT SUBSCRIBE packet."""
  519. try:
  520. # Parse packet ID
  521. packet_id = (payload[0] << 8) | payload[1]
  522. # Parse topic filters (just acknowledge them)
  523. idx = 2
  524. granted_qos = []
  525. learned_serial: str | None = None
  526. while idx < len(payload):
  527. topic_len = (payload[idx] << 8) | payload[idx + 1]
  528. idx += 2
  529. topic = payload[idx : idx + topic_len].decode("utf-8")
  530. idx += topic_len
  531. requested_qos = payload[idx]
  532. idx += 1
  533. logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
  534. granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
  535. # Remember the serial the slicer is listening on so status/version
  536. # responses go to a topic it actually subscribed to.
  537. if learned_serial is None:
  538. extracted = self._extract_serial_from_topic(topic)
  539. if extracted:
  540. learned_serial = extracted
  541. if learned_serial and learned_serial != self._client_serials.get(client_id):
  542. if learned_serial != self.serial:
  543. logger.info(
  544. "%sMQTT client subscribed with serial %s (VP serial is %s) — adapting responses",
  545. self._log_prefix,
  546. learned_serial,
  547. self.serial,
  548. )
  549. self._client_serials[client_id] = learned_serial
  550. # Send SUBACK
  551. suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
  552. suback += bytes(granted_qos)
  553. writer.write(suback)
  554. await writer.drain()
  555. # Send initial status report after subscribe on the client's subscribed topic
  556. await self._send_status_report(writer, serial=self._client_serials.get(client_id, self.serial))
  557. except (IndexError, ValueError, OSError) as e:
  558. logger.debug("MQTT SUBSCRIBE error: %s", e)
  559. async def _send_status_report(self, writer: asyncio.StreamWriter, serial: str | None = None) -> None:
  560. """Send a status report to the slicer after connection.
  561. When a bridge is active and has cached the real printer's latest
  562. push_status, send a copy of the real push with only the upload-state-
  563. machine fields we own (gcode_state, gcode_file, prepare_percent,
  564. subtask_name) overridden. BambuStudio's Send pre-flight checks the
  565. push_status shape against what it expects from the printer model, and
  566. the synthetic stub introduced fields the real H2D doesn't have (storage,
  567. the wrong chamber_temper shape, etc.) which trip the check.
  568. """
  569. try:
  570. self._sequence_id += 1
  571. cached = self._bridge.get_latest_print_state() if self._bridge is not None else None
  572. if isinstance(cached, dict):
  573. # Real-printer-shaped response. Copy the cache, then replace the
  574. # protocol / upload-state fields with values under our control.
  575. print_block = dict(cached)
  576. print_block["sequence_id"] = str(self._sequence_id)
  577. print_block["command"] = "push_status"
  578. print_block["msg"] = 0
  579. print_block["gcode_state"] = self._gcode_state
  580. print_block["gcode_file"] = self._current_file
  581. print_block["gcode_file_prepare_percent"] = self._prepare_percent
  582. if self._current_file:
  583. print_block["subtask_name"] = self._current_file.replace(".3mf", "")
  584. else:
  585. # Don't override real subtask_name with empty if no upload pending.
  586. print_block.setdefault("subtask_name", "")
  587. # Storage-availability indicators the slicer's "Send" pre-flight reads
  588. # (#1228). P1S/A1-class firmware doesn't always include these in
  589. # push_status (no SD card inserted, older field shapes), and BambuStudio
  590. # rejects the send pre-flight with the generic "storage needs to be
  591. # inserted before send to printer" error before even attempting FTP.
  592. # For VP usage the slicer uploads via FTPS to Bambuddy's filesystem —
  593. # the printer's actual SD/storage state is irrelevant on that path.
  594. # Force "available" indicators so the pre-flight passes regardless of
  595. # what the real printer reports. Restores the 0.2.3.2 synthetic-stub
  596. # behaviour for these fields without losing the live AMS / k-profile /
  597. # camera mirror cached-as-base provides.
  598. print_block["home_flag"] = print_block.get("home_flag", 0) | 0x100 # bit 8 = HAS_SDCARD_NORMAL
  599. print_block["sdcard"] = True
  600. print_block.setdefault("storage", {"free": 1_000_000_000, "total": 32_000_000_000})
  601. status = {"print": print_block}
  602. await self._publish_to_report(writer, status, serial or self.serial)
  603. return
  604. # No bridge / no cache yet — fall back to the synthetic stub.
  605. status = {
  606. "print": {
  607. "sequence_id": str(self._sequence_id),
  608. "command": "push_status",
  609. "msg": 0,
  610. "gcode_state": self._gcode_state,
  611. "gcode_file": self._current_file,
  612. "gcode_file_prepare_percent": self._prepare_percent,
  613. "subtask_name": self._current_file.replace(".3mf", "") if self._current_file else "",
  614. "mc_print_stage": "",
  615. "mc_percent": 0,
  616. "mc_remaining_time": 0,
  617. "wifi_signal": "-44dBm",
  618. "print_error": 0,
  619. "print_type": "",
  620. "bed_temper": 25.0,
  621. "bed_target_temper": 0.0,
  622. "nozzle_temper": 25.0,
  623. "nozzle_target_temper": 0.0,
  624. "chamber_temper": 25.0,
  625. "cooling_fan_speed": "0",
  626. "big_fan1_speed": "0",
  627. "big_fan2_speed": "0",
  628. "heatbreak_fan_speed": "0",
  629. "spd_lvl": 1,
  630. "spd_mag": 100,
  631. "stg": [],
  632. "stg_cur": 0,
  633. "layer_num": 0,
  634. "total_layer_num": 0,
  635. "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL)
  636. "hw_switch_state": 0,
  637. "online": {"ahb": False, "rfid": False, "version": 7},
  638. "ams_status": 0,
  639. "sdcard": True,
  640. "storage": {"free": 1000000000, "total": 32000000000},
  641. "upgrade_state": {
  642. "sequence_id": 0,
  643. "progress": "",
  644. "status": "",
  645. "consistency_request": False,
  646. "dis_state": 0,
  647. "err_code": 0,
  648. "force_upgrade": False,
  649. "message": "",
  650. "module": "",
  651. "new_version_state": 2,
  652. "new_ver_list": [],
  653. "ota_new_version_number": "",
  654. "ahb_new_version_number": "",
  655. },
  656. "ipcam": {
  657. "ipcam_dev": "1",
  658. "ipcam_record": "enable",
  659. "timelapse": "disable",
  660. "resolution": "1080p",
  661. "mode_bits": 0,
  662. },
  663. "xcam": {
  664. "allow_skip_parts": False,
  665. "buildplate_marker_detector": True,
  666. "first_layer_inspector": True,
  667. "halt_print_sensitivity": "medium",
  668. "print_halt": True,
  669. "printing_monitor": True,
  670. "spaghetti_detector": True,
  671. },
  672. "lights_report": [{"node": "chamber_light", "mode": "on"}],
  673. "nozzle_diameter": "0.4",
  674. "nozzle_type": "hardened_steel",
  675. }
  676. }
  677. await self._publish_to_report(writer, status, serial or self.serial)
  678. except OSError as e:
  679. logger.error("Failed to send status report: %s", e)
  680. async def _send_version_response(
  681. self, writer: asyncio.StreamWriter, sequence_id: str, serial: str | None = None
  682. ) -> None:
  683. """Send version info response to the slicer."""
  684. try:
  685. product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
  686. # The serial is embedded inside the module[].sn fields *and* used as
  687. # the report topic. Use the client's effective serial so the slicer
  688. # sees internal/topic consistency even when it differs from self.serial.
  689. serial = serial or self.serial
  690. # Build version response matching OrcaSlicer expectations
  691. # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
  692. version_info = {
  693. "info": {
  694. "command": "get_version",
  695. "sequence_id": sequence_id,
  696. "module": [
  697. {
  698. "name": "ota",
  699. "product_name": product_name,
  700. "sw_ver": "01.07.00.00",
  701. "sw_new_ver": "",
  702. "hw_ver": "OTA",
  703. "sn": serial,
  704. "flag": 0,
  705. },
  706. {
  707. "name": "esp32",
  708. "product_name": product_name,
  709. "sw_ver": "01.07.22.25",
  710. "sw_new_ver": "",
  711. "hw_ver": "AP05",
  712. "sn": serial,
  713. "flag": 0,
  714. },
  715. {
  716. "name": "rv1126",
  717. "product_name": product_name,
  718. "sw_ver": "00.00.27.38",
  719. "sw_new_ver": "",
  720. "hw_ver": "AP05",
  721. "sn": serial,
  722. "flag": 0,
  723. },
  724. {
  725. "name": "th",
  726. "product_name": product_name,
  727. "sw_ver": "00.00.04.00",
  728. "sw_new_ver": "",
  729. "hw_ver": "TH07",
  730. "sn": serial,
  731. "flag": 0,
  732. },
  733. {
  734. "name": "mc",
  735. "product_name": product_name,
  736. "sw_ver": "00.00.10.00",
  737. "sw_new_ver": "",
  738. "hw_ver": "MC07",
  739. "sn": serial,
  740. "flag": 0,
  741. },
  742. ],
  743. }
  744. }
  745. # Overlay real version modules from the bridge cache when available
  746. # (specifically the AMS modules ams/0, n3f/0, n3s/128 etc. that
  747. # BambuStudio's Prepare tab uses to identify AMS hardware — without
  748. # them every AMS unit shows as "unknown" in the Prepare panel).
  749. if self._bridge is not None:
  750. cached_modules = self._bridge.get_latest_version_modules()
  751. if isinstance(cached_modules, list) and cached_modules:
  752. version_info["info"]["module"] = cached_modules
  753. await self._publish_to_report(writer, version_info, serial)
  754. logger.info("Sent version response (product_name=%s)", product_name)
  755. except OSError as e:
  756. logger.error("Failed to send version response: %s", e)
  757. def set_gcode_state(self, state: str, filename: str = "", prepare_percent: str = "0") -> None:
  758. """Update the gcode state reported to connected slicers.
  759. Called by the manager to reflect FTP upload progress/completion.
  760. """
  761. self._gcode_state = state
  762. self._current_file = filename
  763. self._prepare_percent = prepare_percent
  764. async def _publish_to_report(self, writer: asyncio.StreamWriter, payload: dict, serial: str = "") -> None:
  765. """Publish a message on the device report topic.
  766. Real Bambu printers wire-format push_status JSON with 4-space indentation
  767. (32254 bytes for an idle H2D push vs 14268 bytes compact). BambuStudio's
  768. Send pre-flight rejects compact JSON — without matching the on-wire
  769. format the slicer never proceeds to FTP upload.
  770. """
  771. topic = f"device/{serial or self.serial}/report"
  772. message = json.dumps(payload, indent=4)
  773. topic_bytes = topic.encode("utf-8")
  774. message_bytes = message.encode("utf-8")
  775. remaining = 2 + len(topic_bytes) + len(message_bytes)
  776. packet = bytes([0x30]) # PUBLISH, QoS 0
  777. while remaining > 0:
  778. byte = remaining % 128
  779. remaining //= 128
  780. if remaining > 0:
  781. byte |= 0x80
  782. packet += bytes([byte])
  783. packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  784. packet += topic_bytes
  785. packet += message_bytes
  786. writer.write(packet)
  787. # Timeout the drain to prevent blocking the event loop if the
  788. # MQTT client stops reading (e.g. slicer busy with FTP upload).
  789. try:
  790. await asyncio.wait_for(writer.drain(), timeout=5)
  791. except TimeoutError:
  792. logger.debug("MQTT drain timeout for %s — client may be busy", topic)
  793. async def _send_print_response(
  794. self, writer: asyncio.StreamWriter, sequence_id: str, filename: str, serial: str | None = None
  795. ) -> None:
  796. """Send project_file acknowledgment matching real Bambu printer behavior."""
  797. # Update state so periodic status pushes reflect preparation
  798. self._gcode_state = "PREPARE"
  799. self._current_file = filename
  800. self._prepare_percent = "0"
  801. try:
  802. # Send command acknowledgment — slicer expects to see
  803. # command: "project_file" echoed back before starting FTP upload
  804. subtask_name = filename.replace(".3mf", "") if filename else ""
  805. response = {
  806. "print": {
  807. "command": "project_file",
  808. "sequence_id": sequence_id,
  809. "param": "Metadata/plate_1.gcode",
  810. "subtask_name": subtask_name,
  811. "gcode_state": "PREPARE",
  812. "gcode_file": filename,
  813. "gcode_file_prepare_percent": "0",
  814. "result": "SUCCESS",
  815. "msg": 0,
  816. }
  817. }
  818. await self._publish_to_report(writer, response, serial or self.serial)
  819. logger.info("Sent project_file acknowledgment for %s", filename)
  820. except OSError as e:
  821. logger.error("Failed to send print response: %s", e)
  822. async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
  823. """Handle MQTT PUBLISH packet."""
  824. try:
  825. # Parse topic
  826. idx = 0
  827. topic_len = (payload[idx] << 8) | payload[idx + 1]
  828. idx += 2
  829. topic = payload[idx : idx + topic_len].decode("utf-8")
  830. idx += topic_len
  831. # Check for packet ID (QoS > 0)
  832. qos = (header & 0x06) >> 1
  833. if qos > 0:
  834. # packet_id = (payload[idx] << 8) | payload[idx + 1]
  835. idx += 2
  836. # Parse message
  837. message = payload[idx:].decode("utf-8")
  838. logger.info("MQTT publish to %s: %s...", topic, message[:100])
  839. # Only handle publishes on *some* device/.../request topic. The
  840. # serial is taken from the topic rather than compared against
  841. # self.serial: the client is already authenticated via the access
  842. # code, and Orca/BambuStudio may have a cached serial that differs
  843. # from the VP's computed self.serial (#927). Use the topic's serial
  844. # for all responses so they land on the topic the slicer subscribed
  845. # to.
  846. if not topic.startswith("device/") or "/request" not in topic:
  847. return
  848. client_serial = self._extract_serial_from_topic(topic) or self.serial
  849. if client_serial and client_serial != self._client_serials.get(client_id):
  850. if client_serial != self.serial:
  851. logger.info(
  852. "%sMQTT client publishing with serial %s (VP serial is %s) — adapting responses",
  853. self._log_prefix,
  854. client_serial,
  855. self.serial,
  856. )
  857. self._client_serials[client_id] = client_serial
  858. try:
  859. # Some slicer builds (observed with OrcaSlicer on Linux, #927)
  860. # include the C-string null terminator in the MQTT payload
  861. # length, so the decoded message ends with \x00. Real brokers
  862. # pass the bytes through; strict json.loads raises "Extra data"
  863. # and every pushall/get_version/project_file silently dropped.
  864. data = json.loads(message.rstrip("\x00 \r\n\t"))
  865. except json.JSONDecodeError as e:
  866. logger.debug(
  867. "MQTT publish JSON decode failed: %s (payload=%r)",
  868. e,
  869. message[:200],
  870. )
  871. return
  872. # The synthetic flow below is the original (pre-bridge) behaviour and is
  873. # what the proven-working FTP "Send" depends on. Do NOT replace any
  874. # synthetic response with a forward — only ADD forwarding alongside,
  875. # at the bottom, for commands the synthetic flow doesn't handle
  876. # (AMS write / xcam / system / etc., which need to actually reach
  877. # the real printer).
  878. handled_locally = False
  879. # Handle pushing command (status request)
  880. if "pushing" in data:
  881. pushing_data = data["pushing"]
  882. command = pushing_data.get("command", "")
  883. logger.info("MQTT pushing command: %s", command)
  884. if command == "pushall":
  885. logger.info("Sending status report in response to pushall")
  886. await self._send_status_report(writer, serial=client_serial)
  887. handled_locally = True
  888. elif command == "start":
  889. logger.info("Starting status push stream")
  890. await self._send_status_report(writer, serial=client_serial)
  891. handled_locally = True
  892. # Handle info commands (get_version, etc.)
  893. if "info" in data:
  894. info_data = data["info"]
  895. command = info_data.get("command", "")
  896. sequence_id = info_data.get("sequence_id", "0")
  897. logger.info("MQTT info command: %s", command)
  898. if command == "get_version":
  899. await self._send_version_response(writer, sequence_id, serial=client_serial)
  900. handled_locally = True
  901. # Handle print commands
  902. if "print" in data:
  903. print_data = data["print"]
  904. command = print_data.get("command", "")
  905. filename = print_data.get("subtask_name", "")
  906. sequence_id = print_data.get("sequence_id", "0")
  907. logger.info("MQTT print command: %s for %s", command, filename)
  908. if command in ("project_file", "gcode_file"):
  909. # File lives on Bambuddy, not the printer — synthetic only.
  910. file_3mf = print_data.get("file", filename)
  911. await self._send_print_response(writer, sequence_id, file_3mf, serial=client_serial)
  912. if self.on_print_command:
  913. await self._notify_print_command(filename, print_data)
  914. handled_locally = True
  915. # Forward anything the synthetic flow didn't handle to the real
  916. # printer. AMS load / dry / xcam / system / extrusion_cali_get etc.
  917. if not handled_locally and self._bridge is not None and self._bridge.is_active:
  918. self._bridge.forward_to_printer(data)
  919. except (IndexError, ValueError, OSError) as e:
  920. logger.debug("MQTT PUBLISH error: %s", e)
  921. async def _notify_print_command(self, filename: str, data: dict) -> None:
  922. """Notify callback of print command."""
  923. if self.on_print_command:
  924. try:
  925. result = self.on_print_command(filename, data)
  926. if asyncio.iscoroutine(result):
  927. await result
  928. except Exception as e:
  929. logger.error("Print command callback error: %s", e)