mqtt_server.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903
  1. """MQTT broker for virtual printer.
  2. Implements an MQTT broker that accepts connections from slicers,
  3. authenticates with the configured access code, and logs print commands.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. from collections.abc import Callable
  10. from pathlib import Path
  11. logger = logging.getLogger(__name__)
  12. # Default MQTT port for Bambu printers (MQTT over TLS)
  13. MQTT_PORT = 8883
  14. # Model code → product_name for version response (must match what slicer expects)
  15. MODEL_PRODUCT_NAMES = {
  16. "BL-P001": "X1 Carbon",
  17. "BL-P002": "X1",
  18. "C13": "X1E",
  19. "N6": "X2D",
  20. "C11": "P1P",
  21. "C12": "P1S",
  22. "N7": "P2S",
  23. "N2S": "A1",
  24. "N1": "A1 mini",
  25. "O1D": "H2D",
  26. "O1C": "H2C",
  27. "O1C2": "H2C",
  28. "O1S": "H2S",
  29. }
  30. class VirtualPrinterMQTTServer:
  31. """MQTT broker that accepts connections from slicers.
  32. This is a minimal MQTT broker implementation that:
  33. - Accepts TLS connections on port 8883
  34. - Authenticates with username 'bblp' and the configured access code
  35. - Receives print commands on device/{serial}/request
  36. - Can publish status on device/{serial}/report
  37. """
  38. def __init__(
  39. self,
  40. serial: str,
  41. access_code: str,
  42. cert_path: Path,
  43. key_path: Path,
  44. port: int = MQTT_PORT,
  45. on_print_command: Callable[[str, dict], None] | None = None,
  46. ):
  47. """Initialize the MQTT server.
  48. Args:
  49. serial: Virtual printer serial number
  50. access_code: Password for authentication
  51. cert_path: Path to TLS certificate
  52. key_path: Path to TLS private key
  53. port: Port to listen on (default 8883)
  54. on_print_command: Callback when print command received (filename, data)
  55. """
  56. self.serial = serial
  57. self.access_code = access_code
  58. self.cert_path = cert_path
  59. self.key_path = key_path
  60. self.port = port
  61. self.on_print_command = on_print_command
  62. self._running = False
  63. self._broker = None
  64. self._broker_task = None
  65. async def start(self) -> None:
  66. """Start the MQTT broker."""
  67. if self._running:
  68. return
  69. # Try to import amqtt
  70. try:
  71. from amqtt.broker import Broker
  72. except ImportError:
  73. logger.error("amqtt not installed. Run: pip install amqtt")
  74. return
  75. logger.info("Starting virtual printer MQTT broker on port %s", self.port)
  76. # Build broker configuration
  77. config = {
  78. "listeners": {
  79. "default": {
  80. "type": "tcp",
  81. "bind": f"0.0.0.0:{self.port}",
  82. "ssl": "on",
  83. "certfile": str(self.cert_path),
  84. "keyfile": str(self.key_path),
  85. },
  86. },
  87. "auth": {
  88. "allow-anonymous": False,
  89. "plugins": ["auth_custom"],
  90. },
  91. "topic-check": {
  92. "enabled": False, # Allow any topic
  93. },
  94. }
  95. try:
  96. self._running = True
  97. # Create and start broker
  98. self._broker = Broker(config)
  99. # Register custom auth plugin
  100. self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate
  101. # Start the broker
  102. await self._broker.start()
  103. logger.info("MQTT broker started on port %s", self.port)
  104. # Keep running
  105. while self._running:
  106. await asyncio.sleep(1)
  107. except OSError as e:
  108. if e.errno == 98: # Address already in use
  109. logger.error("MQTT port %s is already in use", self.port)
  110. else:
  111. logger.error("MQTT broker error: %s", e)
  112. except asyncio.CancelledError:
  113. logger.debug("MQTT broker task cancelled")
  114. except Exception as e:
  115. logger.error("MQTT broker error: %s", e)
  116. finally:
  117. await self.stop()
  118. async def _authenticate(self, session) -> bool:
  119. """Authenticate MQTT connection.
  120. Args:
  121. session: MQTT session with username/password
  122. Returns:
  123. True if authentication successful
  124. """
  125. username = getattr(session, "username", None)
  126. password = getattr(session, "password", None)
  127. # Bambu slicers use 'bblp' as username and access code as password
  128. if username == "bblp" and password == self.access_code:
  129. logger.debug("MQTT client authenticated from %s", session.remote_address)
  130. return True
  131. logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address)
  132. return False
  133. async def stop(self) -> None:
  134. """Stop the MQTT broker."""
  135. logger.info("Stopping MQTT broker")
  136. self._running = False
  137. if self._broker:
  138. try:
  139. await self._broker.shutdown()
  140. except OSError as e:
  141. logger.debug("Error shutting down MQTT broker: %s", e)
  142. self._broker = None
  143. class SimpleMQTTServer:
  144. """Simplified MQTT server using raw sockets.
  145. This is a fallback implementation that handles basic MQTT protocol
  146. without requiring the amqtt library. It's less feature-complete but
  147. more lightweight.
  148. """
  149. def __init__(
  150. self,
  151. serial: str,
  152. access_code: str,
  153. cert_path: Path,
  154. key_path: Path,
  155. port: int = MQTT_PORT,
  156. on_print_command: Callable[[str, dict], None] | None = None,
  157. model: str = "",
  158. bind_address: str = "0.0.0.0", # nosec B104
  159. vp_name: str = "",
  160. ):
  161. self.serial = serial
  162. self.access_code = access_code
  163. self.model = model
  164. self.cert_path = cert_path
  165. self.key_path = key_path
  166. self.port = port
  167. self.on_print_command = on_print_command
  168. self.bind_address = bind_address
  169. self.vp_name = vp_name
  170. self._log_prefix = f"[{vp_name}] " if vp_name else ""
  171. self._running = False
  172. self._server = None
  173. self._clients: dict[str, asyncio.StreamWriter] = {}
  174. # Per-client "effective serial" — the serial the slicer actually uses in
  175. # device/{serial}/report|request topics. Populated from the first
  176. # SUBSCRIBE/PUBLISH we see on a connection. This lets the VP respond on
  177. # the topic the slicer is listening on even when it disagrees with
  178. # self.serial (e.g. a stale Orca config that was bound to an older VP
  179. # serial, or a printer entry that was re-pointed at the VP IP without
  180. # updating the serial).
  181. self._client_serials: dict[str, str] = {}
  182. self._status_push_task: asyncio.Task | None = None
  183. self._sequence_id = 0
  184. # Dynamic state for status reports
  185. self._gcode_state = "IDLE"
  186. self._current_file = ""
  187. self._prepare_percent = "0"
  188. async def start(self) -> None:
  189. """Start the MQTT server."""
  190. if self._running:
  191. return
  192. logger.info("Starting simple MQTT server on port %s", self.port)
  193. # Create SSL context with Bambu-compatible settings
  194. ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  195. ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
  196. # Match Bambu printer behavior - accept any client
  197. ssl_context.verify_mode = ssl.CERT_NONE
  198. # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3)
  199. ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
  200. # Disable hostname checking
  201. ssl_context.check_hostname = False
  202. # Log certificate info
  203. import subprocess
  204. try:
  205. result = subprocess.run(
  206. ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"],
  207. capture_output=True,
  208. text=True,
  209. timeout=5,
  210. )
  211. logger.info("MQTT SSL cert info: %s", result.stdout.strip())
  212. except (OSError, subprocess.SubprocessError):
  213. pass # Certificate info is for debug logging only; not critical
  214. logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path)
  215. try:
  216. self._running = True
  217. # Wrapper to log ALL connection attempts including SSL errors
  218. async def connection_handler(reader, writer):
  219. try:
  220. addr = writer.get_extra_info("peername")
  221. ssl_obj = writer.get_extra_info("ssl_object")
  222. if ssl_obj:
  223. logger.info(
  224. f"{self._log_prefix}MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}"
  225. )
  226. else:
  227. logger.info("%sMQTT connection from %s (no TLS?)", self._log_prefix, addr)
  228. await self._handle_client(reader, writer)
  229. except ssl.SSLError as e:
  230. logger.error("MQTT SSL error: %s", e)
  231. except Exception as e:
  232. logger.error("MQTT connection handler error: %s", e)
  233. self._server = await asyncio.start_server(
  234. connection_handler,
  235. self.bind_address,
  236. self.port,
  237. ssl=ssl_context,
  238. )
  239. logger.info("Simple MQTT server listening on port %s", self.port)
  240. # Start periodic status push task
  241. self._status_push_task = asyncio.create_task(self._periodic_status_push())
  242. async with self._server:
  243. await self._server.serve_forever()
  244. except OSError as e:
  245. if e.errno == 98: # Address already in use
  246. logger.error("MQTT port %s is already in use", self.port)
  247. else:
  248. logger.error("MQTT server error: %s", e)
  249. except asyncio.CancelledError:
  250. logger.debug("MQTT server task cancelled")
  251. except Exception as e:
  252. logger.error("MQTT server error: %s", e)
  253. finally:
  254. await self.stop()
  255. async def stop(self) -> None:
  256. """Stop the MQTT server."""
  257. logger.info("Stopping simple MQTT server")
  258. self._running = False
  259. # Stop periodic status push
  260. if self._status_push_task:
  261. self._status_push_task.cancel()
  262. try:
  263. await self._status_push_task
  264. except asyncio.CancelledError:
  265. pass # Expected when stopping the periodic status push task
  266. self._status_push_task = None
  267. # Close all client connections (iterate over copy to avoid modification during iteration)
  268. for _client_id, writer in list(self._clients.items()):
  269. try:
  270. writer.close()
  271. await writer.wait_closed()
  272. except OSError:
  273. pass # Best-effort client connection cleanup; client may have disconnected
  274. self._clients.clear()
  275. self._client_serials.clear()
  276. if self._server:
  277. try:
  278. self._server.close()
  279. await self._server.wait_closed()
  280. except OSError:
  281. pass # Best-effort server shutdown; port may already be released
  282. self._server = None
  283. @staticmethod
  284. def _extract_serial_from_topic(topic: str) -> str | None:
  285. """Pull the serial out of a `device/{serial}/report|request` topic.
  286. Returns None if the topic doesn't match that shape — callers fall back
  287. to self.serial in that case.
  288. """
  289. if not topic.startswith("device/"):
  290. return None
  291. rest = topic[len("device/") :]
  292. # Expect "{serial}/report" or "{serial}/request" (possibly with suffixes).
  293. slash = rest.find("/")
  294. if slash <= 0:
  295. return None
  296. return rest[:slash]
  297. async def _periodic_status_push(self) -> None:
  298. """Send periodic status updates to all connected clients."""
  299. logger.info("Starting periodic status push task")
  300. while self._running:
  301. try:
  302. await asyncio.sleep(1) # Push every 1 second like real printers
  303. # Send status to all connected clients
  304. disconnected = []
  305. for client_id, writer in list(self._clients.items()):
  306. try:
  307. if writer.is_closing():
  308. disconnected.append(client_id)
  309. continue
  310. serial = self._client_serials.get(client_id, self.serial)
  311. await self._send_status_report(writer, serial=serial)
  312. except OSError as e:
  313. logger.debug("Failed to push status to %s: %s", client_id, e)
  314. disconnected.append(client_id)
  315. # Remove disconnected clients
  316. for client_id in disconnected:
  317. self._clients.pop(client_id, None)
  318. self._client_serials.pop(client_id, None)
  319. except asyncio.CancelledError:
  320. break
  321. except Exception as e:
  322. logger.error("Periodic status push error: %s", e)
  323. logger.info("Periodic status push task stopped")
  324. async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
  325. """Handle an MQTT client connection."""
  326. addr = writer.get_extra_info("peername")
  327. client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
  328. logger.info("%sMQTT client connected: %s", self._log_prefix, client_id)
  329. authenticated = False
  330. try:
  331. while self._running:
  332. # Read MQTT fixed header
  333. try:
  334. header = await asyncio.wait_for(reader.read(1), timeout=60)
  335. except TimeoutError:
  336. break
  337. if not header:
  338. break
  339. packet_type = (header[0] & 0xF0) >> 4
  340. # Read remaining length
  341. remaining_length = await self._read_remaining_length(reader)
  342. if remaining_length is None:
  343. break
  344. # Read payload
  345. payload = await reader.read(remaining_length) if remaining_length > 0 else b""
  346. # Handle packet types
  347. if packet_type == 1: # CONNECT
  348. authenticated = await self._handle_connect(payload, writer)
  349. if not authenticated:
  350. break
  351. # Register client for periodic status pushes; start with
  352. # self.serial as the fallback until we learn the slicer's
  353. # preferred serial from the first SUBSCRIBE/PUBLISH.
  354. self._clients[client_id] = writer
  355. self._client_serials[client_id] = self.serial
  356. elif packet_type == 3: # PUBLISH
  357. if authenticated:
  358. await self._handle_publish(header[0], payload, writer, client_id)
  359. elif packet_type == 8: # SUBSCRIBE
  360. if authenticated:
  361. await self._handle_subscribe(payload, writer, client_id)
  362. elif packet_type == 12: # PINGREQ
  363. # Send PINGRESP
  364. writer.write(bytes([0xD0, 0x00]))
  365. await writer.drain()
  366. elif packet_type == 14: # DISCONNECT
  367. break
  368. except asyncio.CancelledError:
  369. pass # Expected when server is shutting down and cancels client tasks
  370. except Exception as e:
  371. logger.debug("MQTT client error: %s", e)
  372. finally:
  373. logger.debug("MQTT client disconnected: %s", client_id)
  374. self._clients.pop(client_id, None)
  375. self._client_serials.pop(client_id, None)
  376. try:
  377. writer.close()
  378. await writer.wait_closed()
  379. except OSError:
  380. pass # Best-effort socket cleanup on client disconnect
  381. async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None:
  382. """Read MQTT remaining length (variable byte integer)."""
  383. multiplier = 1
  384. value = 0
  385. for _ in range(4):
  386. try:
  387. byte = await reader.read(1)
  388. if not byte:
  389. return None
  390. encoded = byte[0]
  391. value += (encoded & 127) * multiplier
  392. if (encoded & 128) == 0:
  393. return value
  394. multiplier *= 128
  395. except OSError:
  396. return None
  397. return None
  398. async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> bool:
  399. """Handle MQTT CONNECT packet.
  400. Returns True if authentication successful.
  401. """
  402. try:
  403. # Parse CONNECT packet
  404. # Skip protocol name length and name
  405. idx = 0
  406. proto_len = (payload[idx] << 8) | payload[idx + 1]
  407. idx += 2 + proto_len
  408. # Skip protocol level and connect flags
  409. # connect_flags = payload[idx + 1]
  410. idx += 2
  411. # Skip keepalive
  412. idx += 2
  413. # Read client ID
  414. client_id_len = (payload[idx] << 8) | payload[idx + 1]
  415. idx += 2
  416. # client_id = payload[idx : idx + client_id_len].decode("utf-8")
  417. idx += client_id_len
  418. # Read username
  419. username_len = (payload[idx] << 8) | payload[idx + 1]
  420. idx += 2
  421. username = payload[idx : idx + username_len].decode("utf-8")
  422. idx += username_len
  423. # Read password
  424. password_len = (payload[idx] << 8) | payload[idx + 1]
  425. idx += 2
  426. password = payload[idx : idx + password_len].decode("utf-8")
  427. # Authenticate
  428. if username == "bblp" and password == self.access_code:
  429. # Send CONNACK with success
  430. writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
  431. await writer.drain()
  432. logger.info("%sMQTT client authenticated successfully", self._log_prefix)
  433. # Send immediate status report after auth - slicer expects this
  434. await self._send_status_report(writer)
  435. return True
  436. else:
  437. # Send CONNACK with auth failure
  438. writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
  439. await writer.drain()
  440. logger.warning("%sMQTT auth failed for user '%s' (access code mismatch)", self._log_prefix, username)
  441. return False
  442. except (IndexError, ValueError) as e:
  443. logger.debug("MQTT CONNECT parse error: %s", e)
  444. # Send CONNACK with error
  445. writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error
  446. await writer.drain()
  447. return False
  448. async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
  449. """Handle MQTT SUBSCRIBE packet."""
  450. try:
  451. # Parse packet ID
  452. packet_id = (payload[0] << 8) | payload[1]
  453. # Parse topic filters (just acknowledge them)
  454. idx = 2
  455. granted_qos = []
  456. learned_serial: str | None = None
  457. while idx < len(payload):
  458. topic_len = (payload[idx] << 8) | payload[idx + 1]
  459. idx += 2
  460. topic = payload[idx : idx + topic_len].decode("utf-8")
  461. idx += topic_len
  462. requested_qos = payload[idx]
  463. idx += 1
  464. logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
  465. granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
  466. # Remember the serial the slicer is listening on so status/version
  467. # responses go to a topic it actually subscribed to.
  468. if learned_serial is None:
  469. extracted = self._extract_serial_from_topic(topic)
  470. if extracted:
  471. learned_serial = extracted
  472. if learned_serial and learned_serial != self._client_serials.get(client_id):
  473. if learned_serial != self.serial:
  474. logger.info(
  475. "%sMQTT client subscribed with serial %s (VP serial is %s) — adapting responses",
  476. self._log_prefix,
  477. learned_serial,
  478. self.serial,
  479. )
  480. self._client_serials[client_id] = learned_serial
  481. # Send SUBACK
  482. suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
  483. suback += bytes(granted_qos)
  484. writer.write(suback)
  485. await writer.drain()
  486. # Send initial status report after subscribe on the client's subscribed topic
  487. await self._send_status_report(writer, serial=self._client_serials.get(client_id, self.serial))
  488. except (IndexError, ValueError, OSError) as e:
  489. logger.debug("MQTT SUBSCRIBE error: %s", e)
  490. async def _send_status_report(self, writer: asyncio.StreamWriter, serial: str | None = None) -> None:
  491. """Send a status report to the slicer after connection."""
  492. try:
  493. # Build status message matching Bambu printer format
  494. self._sequence_id += 1
  495. status = {
  496. "print": {
  497. "sequence_id": str(self._sequence_id),
  498. "command": "push_status",
  499. "msg": 0,
  500. "gcode_state": self._gcode_state,
  501. "gcode_file": self._current_file,
  502. "gcode_file_prepare_percent": self._prepare_percent,
  503. "subtask_name": self._current_file.replace(".3mf", "") if self._current_file else "",
  504. "mc_print_stage": "",
  505. "mc_percent": 0,
  506. "mc_remaining_time": 0,
  507. "wifi_signal": "-44dBm",
  508. "print_error": 0,
  509. "print_type": "",
  510. "bed_temper": 25.0,
  511. "bed_target_temper": 0.0,
  512. "nozzle_temper": 25.0,
  513. "nozzle_target_temper": 0.0,
  514. "chamber_temper": 25.0,
  515. "cooling_fan_speed": "0",
  516. "big_fan1_speed": "0",
  517. "big_fan2_speed": "0",
  518. "heatbreak_fan_speed": "0",
  519. "spd_lvl": 1,
  520. "spd_mag": 100,
  521. "stg": [],
  522. "stg_cur": 0,
  523. "layer_num": 0,
  524. "total_layer_num": 0,
  525. "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL)
  526. "hw_switch_state": 0,
  527. "online": {"ahb": False, "rfid": False, "version": 7},
  528. "ams_status": 0,
  529. "sdcard": True,
  530. "storage": {"free": 1000000000, "total": 32000000000},
  531. "upgrade_state": {
  532. "sequence_id": 0,
  533. "progress": "",
  534. "status": "",
  535. "consistency_request": False,
  536. "dis_state": 0,
  537. "err_code": 0,
  538. "force_upgrade": False,
  539. "message": "",
  540. "module": "",
  541. "new_version_state": 2,
  542. "new_ver_list": [],
  543. "ota_new_version_number": "",
  544. "ahb_new_version_number": "",
  545. },
  546. "ipcam": {
  547. "ipcam_dev": "1",
  548. "ipcam_record": "enable",
  549. "timelapse": "disable",
  550. "resolution": "1080p",
  551. "mode_bits": 0,
  552. },
  553. "xcam": {
  554. "allow_skip_parts": False,
  555. "buildplate_marker_detector": True,
  556. "first_layer_inspector": True,
  557. "halt_print_sensitivity": "medium",
  558. "print_halt": True,
  559. "printing_monitor": True,
  560. "spaghetti_detector": True,
  561. },
  562. "lights_report": [{"node": "chamber_light", "mode": "on"}],
  563. "nozzle_diameter": "0.4",
  564. "nozzle_type": "hardened_steel",
  565. }
  566. }
  567. await self._publish_to_report(writer, status, serial or self.serial)
  568. except OSError as e:
  569. logger.error("Failed to send status report: %s", e)
  570. async def _send_version_response(
  571. self, writer: asyncio.StreamWriter, sequence_id: str, serial: str | None = None
  572. ) -> None:
  573. """Send version info response to the slicer."""
  574. try:
  575. product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
  576. # The serial is embedded inside the module[].sn fields *and* used as
  577. # the report topic. Use the client's effective serial so the slicer
  578. # sees internal/topic consistency even when it differs from self.serial.
  579. serial = serial or self.serial
  580. # Build version response matching OrcaSlicer expectations
  581. # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
  582. version_info = {
  583. "info": {
  584. "command": "get_version",
  585. "sequence_id": sequence_id,
  586. "module": [
  587. {
  588. "name": "ota",
  589. "product_name": product_name,
  590. "sw_ver": "01.07.00.00",
  591. "sw_new_ver": "",
  592. "hw_ver": "OTA",
  593. "sn": serial,
  594. "flag": 0,
  595. },
  596. {
  597. "name": "esp32",
  598. "product_name": product_name,
  599. "sw_ver": "01.07.22.25",
  600. "sw_new_ver": "",
  601. "hw_ver": "AP05",
  602. "sn": serial,
  603. "flag": 0,
  604. },
  605. {
  606. "name": "rv1126",
  607. "product_name": product_name,
  608. "sw_ver": "00.00.27.38",
  609. "sw_new_ver": "",
  610. "hw_ver": "AP05",
  611. "sn": serial,
  612. "flag": 0,
  613. },
  614. {
  615. "name": "th",
  616. "product_name": product_name,
  617. "sw_ver": "00.00.04.00",
  618. "sw_new_ver": "",
  619. "hw_ver": "TH07",
  620. "sn": serial,
  621. "flag": 0,
  622. },
  623. {
  624. "name": "mc",
  625. "product_name": product_name,
  626. "sw_ver": "00.00.10.00",
  627. "sw_new_ver": "",
  628. "hw_ver": "MC07",
  629. "sn": serial,
  630. "flag": 0,
  631. },
  632. ],
  633. }
  634. }
  635. await self._publish_to_report(writer, version_info, serial)
  636. logger.info("Sent version response (product_name=%s)", product_name)
  637. except OSError as e:
  638. logger.error("Failed to send version response: %s", e)
  639. def set_gcode_state(self, state: str, filename: str = "", prepare_percent: str = "0") -> None:
  640. """Update the gcode state reported to connected slicers.
  641. Called by the manager to reflect FTP upload progress/completion.
  642. """
  643. self._gcode_state = state
  644. self._current_file = filename
  645. self._prepare_percent = prepare_percent
  646. async def _publish_to_report(self, writer: asyncio.StreamWriter, payload: dict, serial: str = "") -> None:
  647. """Publish a message on the device report topic."""
  648. topic = f"device/{serial or self.serial}/report"
  649. message = json.dumps(payload)
  650. topic_bytes = topic.encode("utf-8")
  651. message_bytes = message.encode("utf-8")
  652. remaining = 2 + len(topic_bytes) + len(message_bytes)
  653. packet = bytes([0x30]) # PUBLISH, QoS 0
  654. while remaining > 0:
  655. byte = remaining % 128
  656. remaining //= 128
  657. if remaining > 0:
  658. byte |= 0x80
  659. packet += bytes([byte])
  660. packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  661. packet += topic_bytes
  662. packet += message_bytes
  663. writer.write(packet)
  664. # Timeout the drain to prevent blocking the event loop if the
  665. # MQTT client stops reading (e.g. slicer busy with FTP upload).
  666. try:
  667. await asyncio.wait_for(writer.drain(), timeout=5)
  668. except TimeoutError:
  669. logger.debug("MQTT drain timeout for %s — client may be busy", topic)
  670. async def _send_print_response(
  671. self, writer: asyncio.StreamWriter, sequence_id: str, filename: str, serial: str | None = None
  672. ) -> None:
  673. """Send project_file acknowledgment matching real Bambu printer behavior."""
  674. # Update state so periodic status pushes reflect preparation
  675. self._gcode_state = "PREPARE"
  676. self._current_file = filename
  677. self._prepare_percent = "0"
  678. try:
  679. # Send command acknowledgment — slicer expects to see
  680. # command: "project_file" echoed back before starting FTP upload
  681. subtask_name = filename.replace(".3mf", "") if filename else ""
  682. response = {
  683. "print": {
  684. "command": "project_file",
  685. "sequence_id": sequence_id,
  686. "param": "Metadata/plate_1.gcode",
  687. "subtask_name": subtask_name,
  688. "gcode_state": "PREPARE",
  689. "gcode_file": filename,
  690. "gcode_file_prepare_percent": "0",
  691. "result": "SUCCESS",
  692. "msg": 0,
  693. }
  694. }
  695. await self._publish_to_report(writer, response, serial or self.serial)
  696. logger.info("Sent project_file acknowledgment for %s", filename)
  697. except OSError as e:
  698. logger.error("Failed to send print response: %s", e)
  699. async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
  700. """Handle MQTT PUBLISH packet."""
  701. try:
  702. # Parse topic
  703. idx = 0
  704. topic_len = (payload[idx] << 8) | payload[idx + 1]
  705. idx += 2
  706. topic = payload[idx : idx + topic_len].decode("utf-8")
  707. idx += topic_len
  708. # Check for packet ID (QoS > 0)
  709. qos = (header & 0x06) >> 1
  710. if qos > 0:
  711. # packet_id = (payload[idx] << 8) | payload[idx + 1]
  712. idx += 2
  713. # Parse message
  714. message = payload[idx:].decode("utf-8")
  715. logger.info("MQTT publish to %s: %s...", topic, message[:100])
  716. # Only handle publishes on *some* device/.../request topic. The
  717. # serial is taken from the topic rather than compared against
  718. # self.serial: the client is already authenticated via the access
  719. # code, and Orca/BambuStudio may have a cached serial that differs
  720. # from the VP's computed self.serial (#927). Use the topic's serial
  721. # for all responses so they land on the topic the slicer subscribed
  722. # to.
  723. if not topic.startswith("device/") or "/request" not in topic:
  724. return
  725. client_serial = self._extract_serial_from_topic(topic) or self.serial
  726. if client_serial and client_serial != self._client_serials.get(client_id):
  727. if client_serial != self.serial:
  728. logger.info(
  729. "%sMQTT client publishing with serial %s (VP serial is %s) — adapting responses",
  730. self._log_prefix,
  731. client_serial,
  732. self.serial,
  733. )
  734. self._client_serials[client_id] = client_serial
  735. try:
  736. data = json.loads(message)
  737. except json.JSONDecodeError:
  738. return # Non-JSON payloads on request topic are safely ignored
  739. # Handle pushing command (status request)
  740. if "pushing" in data:
  741. pushing_data = data["pushing"]
  742. command = pushing_data.get("command", "")
  743. logger.info("MQTT pushing command: %s", command)
  744. if command == "pushall":
  745. # Slicer is requesting full status - send response
  746. logger.info("Sending status report in response to pushall")
  747. await self._send_status_report(writer, serial=client_serial)
  748. elif command == "start":
  749. # Slicer wants periodic status updates - send one now
  750. logger.info("Starting status push stream")
  751. await self._send_status_report(writer, serial=client_serial)
  752. # Handle info commands (get_version, etc.)
  753. if "info" in data:
  754. info_data = data["info"]
  755. command = info_data.get("command", "")
  756. sequence_id = info_data.get("sequence_id", "0")
  757. logger.info("MQTT info command: %s", command)
  758. if command == "get_version":
  759. await self._send_version_response(writer, sequence_id, serial=client_serial)
  760. # Handle print commands
  761. if "print" in data:
  762. print_data = data["print"]
  763. command = print_data.get("command", "")
  764. filename = print_data.get("subtask_name", "")
  765. sequence_id = print_data.get("sequence_id", "0")
  766. logger.info("MQTT print command: %s for %s", command, filename)
  767. if command == "project_file":
  768. # Respond with PREPARE status so slicer proceeds with FTP upload
  769. file_3mf = print_data.get("file", filename)
  770. await self._send_print_response(writer, sequence_id, file_3mf, serial=client_serial)
  771. if self.on_print_command:
  772. await self._notify_print_command(filename, print_data)
  773. except (IndexError, ValueError, OSError) as e:
  774. logger.debug("MQTT PUBLISH error: %s", e)
  775. async def _notify_print_command(self, filename: str, data: dict) -> None:
  776. """Notify callback of print command."""
  777. if self.on_print_command:
  778. try:
  779. result = self.on_print_command(filename, data)
  780. if asyncio.iscoroutine(result):
  781. await result
  782. except Exception as e:
  783. logger.error("Print command callback error: %s", e)