mqtt_server.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902
  1. """MQTT broker for virtual printer.
  2. Implements an MQTT broker that accepts connections from slicers,
  3. authenticates with the configured access code, and logs print commands.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. from collections.abc import Callable
  10. from pathlib import Path
  11. logger = logging.getLogger(__name__)
  12. # Default MQTT port for Bambu printers (MQTT over TLS)
  13. MQTT_PORT = 8883
  14. # Model code → product_name for version response (must match what slicer expects)
  15. MODEL_PRODUCT_NAMES = {
  16. "BL-P001": "X1 Carbon",
  17. "BL-P002": "X1",
  18. "C13": "X1E",
  19. "C11": "P1P",
  20. "C12": "P1S",
  21. "N7": "P2S",
  22. "N2S": "A1",
  23. "N1": "A1 mini",
  24. "O1D": "H2D",
  25. "O1C": "H2C",
  26. "O1C2": "H2C",
  27. "O1S": "H2S",
  28. }
  29. class VirtualPrinterMQTTServer:
  30. """MQTT broker that accepts connections from slicers.
  31. This is a minimal MQTT broker implementation that:
  32. - Accepts TLS connections on port 8883
  33. - Authenticates with username 'bblp' and the configured access code
  34. - Receives print commands on device/{serial}/request
  35. - Can publish status on device/{serial}/report
  36. """
  37. def __init__(
  38. self,
  39. serial: str,
  40. access_code: str,
  41. cert_path: Path,
  42. key_path: Path,
  43. port: int = MQTT_PORT,
  44. on_print_command: Callable[[str, dict], None] | None = None,
  45. ):
  46. """Initialize the MQTT server.
  47. Args:
  48. serial: Virtual printer serial number
  49. access_code: Password for authentication
  50. cert_path: Path to TLS certificate
  51. key_path: Path to TLS private key
  52. port: Port to listen on (default 8883)
  53. on_print_command: Callback when print command received (filename, data)
  54. """
  55. self.serial = serial
  56. self.access_code = access_code
  57. self.cert_path = cert_path
  58. self.key_path = key_path
  59. self.port = port
  60. self.on_print_command = on_print_command
  61. self._running = False
  62. self._broker = None
  63. self._broker_task = None
  64. async def start(self) -> None:
  65. """Start the MQTT broker."""
  66. if self._running:
  67. return
  68. # Try to import amqtt
  69. try:
  70. from amqtt.broker import Broker
  71. except ImportError:
  72. logger.error("amqtt not installed. Run: pip install amqtt")
  73. return
  74. logger.info("Starting virtual printer MQTT broker on port %s", self.port)
  75. # Build broker configuration
  76. config = {
  77. "listeners": {
  78. "default": {
  79. "type": "tcp",
  80. "bind": f"0.0.0.0:{self.port}",
  81. "ssl": "on",
  82. "certfile": str(self.cert_path),
  83. "keyfile": str(self.key_path),
  84. },
  85. },
  86. "auth": {
  87. "allow-anonymous": False,
  88. "plugins": ["auth_custom"],
  89. },
  90. "topic-check": {
  91. "enabled": False, # Allow any topic
  92. },
  93. }
  94. try:
  95. self._running = True
  96. # Create and start broker
  97. self._broker = Broker(config)
  98. # Register custom auth plugin
  99. self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate
  100. # Start the broker
  101. await self._broker.start()
  102. logger.info("MQTT broker started on port %s", self.port)
  103. # Keep running
  104. while self._running:
  105. await asyncio.sleep(1)
  106. except OSError as e:
  107. if e.errno == 98: # Address already in use
  108. logger.error("MQTT port %s is already in use", self.port)
  109. else:
  110. logger.error("MQTT broker error: %s", e)
  111. except asyncio.CancelledError:
  112. logger.debug("MQTT broker task cancelled")
  113. except Exception as e:
  114. logger.error("MQTT broker error: %s", e)
  115. finally:
  116. await self.stop()
  117. async def _authenticate(self, session) -> bool:
  118. """Authenticate MQTT connection.
  119. Args:
  120. session: MQTT session with username/password
  121. Returns:
  122. True if authentication successful
  123. """
  124. username = getattr(session, "username", None)
  125. password = getattr(session, "password", None)
  126. # Bambu slicers use 'bblp' as username and access code as password
  127. if username == "bblp" and password == self.access_code:
  128. logger.debug("MQTT client authenticated from %s", session.remote_address)
  129. return True
  130. logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address)
  131. return False
  132. async def stop(self) -> None:
  133. """Stop the MQTT broker."""
  134. logger.info("Stopping MQTT broker")
  135. self._running = False
  136. if self._broker:
  137. try:
  138. await self._broker.shutdown()
  139. except OSError as e:
  140. logger.debug("Error shutting down MQTT broker: %s", e)
  141. self._broker = None
  142. class SimpleMQTTServer:
  143. """Simplified MQTT server using raw sockets.
  144. This is a fallback implementation that handles basic MQTT protocol
  145. without requiring the amqtt library. It's less feature-complete but
  146. more lightweight.
  147. """
  148. def __init__(
  149. self,
  150. serial: str,
  151. access_code: str,
  152. cert_path: Path,
  153. key_path: Path,
  154. port: int = MQTT_PORT,
  155. on_print_command: Callable[[str, dict], None] | None = None,
  156. model: str = "",
  157. bind_address: str = "0.0.0.0", # nosec B104
  158. vp_name: str = "",
  159. ):
  160. self.serial = serial
  161. self.access_code = access_code
  162. self.model = model
  163. self.cert_path = cert_path
  164. self.key_path = key_path
  165. self.port = port
  166. self.on_print_command = on_print_command
  167. self.bind_address = bind_address
  168. self.vp_name = vp_name
  169. self._log_prefix = f"[{vp_name}] " if vp_name else ""
  170. self._running = False
  171. self._server = None
  172. self._clients: dict[str, asyncio.StreamWriter] = {}
  173. # Per-client "effective serial" — the serial the slicer actually uses in
  174. # device/{serial}/report|request topics. Populated from the first
  175. # SUBSCRIBE/PUBLISH we see on a connection. This lets the VP respond on
  176. # the topic the slicer is listening on even when it disagrees with
  177. # self.serial (e.g. a stale Orca config that was bound to an older VP
  178. # serial, or a printer entry that was re-pointed at the VP IP without
  179. # updating the serial).
  180. self._client_serials: dict[str, str] = {}
  181. self._status_push_task: asyncio.Task | None = None
  182. self._sequence_id = 0
  183. # Dynamic state for status reports
  184. self._gcode_state = "IDLE"
  185. self._current_file = ""
  186. self._prepare_percent = "0"
  187. async def start(self) -> None:
  188. """Start the MQTT server."""
  189. if self._running:
  190. return
  191. logger.info("Starting simple MQTT server on port %s", self.port)
  192. # Create SSL context with Bambu-compatible settings
  193. ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  194. ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
  195. # Match Bambu printer behavior - accept any client
  196. ssl_context.verify_mode = ssl.CERT_NONE
  197. # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3)
  198. ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
  199. # Disable hostname checking
  200. ssl_context.check_hostname = False
  201. # Log certificate info
  202. import subprocess
  203. try:
  204. result = subprocess.run(
  205. ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"],
  206. capture_output=True,
  207. text=True,
  208. timeout=5,
  209. )
  210. logger.info("MQTT SSL cert info: %s", result.stdout.strip())
  211. except (OSError, subprocess.SubprocessError):
  212. pass # Certificate info is for debug logging only; not critical
  213. logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path)
  214. try:
  215. self._running = True
  216. # Wrapper to log ALL connection attempts including SSL errors
  217. async def connection_handler(reader, writer):
  218. try:
  219. addr = writer.get_extra_info("peername")
  220. ssl_obj = writer.get_extra_info("ssl_object")
  221. if ssl_obj:
  222. logger.info(
  223. f"{self._log_prefix}MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}"
  224. )
  225. else:
  226. logger.info("%sMQTT connection from %s (no TLS?)", self._log_prefix, addr)
  227. await self._handle_client(reader, writer)
  228. except ssl.SSLError as e:
  229. logger.error("MQTT SSL error: %s", e)
  230. except Exception as e:
  231. logger.error("MQTT connection handler error: %s", e)
  232. self._server = await asyncio.start_server(
  233. connection_handler,
  234. self.bind_address,
  235. self.port,
  236. ssl=ssl_context,
  237. )
  238. logger.info("Simple MQTT server listening on port %s", self.port)
  239. # Start periodic status push task
  240. self._status_push_task = asyncio.create_task(self._periodic_status_push())
  241. async with self._server:
  242. await self._server.serve_forever()
  243. except OSError as e:
  244. if e.errno == 98: # Address already in use
  245. logger.error("MQTT port %s is already in use", self.port)
  246. else:
  247. logger.error("MQTT server error: %s", e)
  248. except asyncio.CancelledError:
  249. logger.debug("MQTT server task cancelled")
  250. except Exception as e:
  251. logger.error("MQTT server error: %s", e)
  252. finally:
  253. await self.stop()
  254. async def stop(self) -> None:
  255. """Stop the MQTT server."""
  256. logger.info("Stopping simple MQTT server")
  257. self._running = False
  258. # Stop periodic status push
  259. if self._status_push_task:
  260. self._status_push_task.cancel()
  261. try:
  262. await self._status_push_task
  263. except asyncio.CancelledError:
  264. pass # Expected when stopping the periodic status push task
  265. self._status_push_task = None
  266. # Close all client connections (iterate over copy to avoid modification during iteration)
  267. for _client_id, writer in list(self._clients.items()):
  268. try:
  269. writer.close()
  270. await writer.wait_closed()
  271. except OSError:
  272. pass # Best-effort client connection cleanup; client may have disconnected
  273. self._clients.clear()
  274. self._client_serials.clear()
  275. if self._server:
  276. try:
  277. self._server.close()
  278. await self._server.wait_closed()
  279. except OSError:
  280. pass # Best-effort server shutdown; port may already be released
  281. self._server = None
  282. @staticmethod
  283. def _extract_serial_from_topic(topic: str) -> str | None:
  284. """Pull the serial out of a `device/{serial}/report|request` topic.
  285. Returns None if the topic doesn't match that shape — callers fall back
  286. to self.serial in that case.
  287. """
  288. if not topic.startswith("device/"):
  289. return None
  290. rest = topic[len("device/") :]
  291. # Expect "{serial}/report" or "{serial}/request" (possibly with suffixes).
  292. slash = rest.find("/")
  293. if slash <= 0:
  294. return None
  295. return rest[:slash]
  296. async def _periodic_status_push(self) -> None:
  297. """Send periodic status updates to all connected clients."""
  298. logger.info("Starting periodic status push task")
  299. while self._running:
  300. try:
  301. await asyncio.sleep(1) # Push every 1 second like real printers
  302. # Send status to all connected clients
  303. disconnected = []
  304. for client_id, writer in list(self._clients.items()):
  305. try:
  306. if writer.is_closing():
  307. disconnected.append(client_id)
  308. continue
  309. serial = self._client_serials.get(client_id, self.serial)
  310. await self._send_status_report(writer, serial=serial)
  311. except OSError as e:
  312. logger.debug("Failed to push status to %s: %s", client_id, e)
  313. disconnected.append(client_id)
  314. # Remove disconnected clients
  315. for client_id in disconnected:
  316. self._clients.pop(client_id, None)
  317. self._client_serials.pop(client_id, None)
  318. except asyncio.CancelledError:
  319. break
  320. except Exception as e:
  321. logger.error("Periodic status push error: %s", e)
  322. logger.info("Periodic status push task stopped")
  323. async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
  324. """Handle an MQTT client connection."""
  325. addr = writer.get_extra_info("peername")
  326. client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
  327. logger.info("%sMQTT client connected: %s", self._log_prefix, client_id)
  328. authenticated = False
  329. try:
  330. while self._running:
  331. # Read MQTT fixed header
  332. try:
  333. header = await asyncio.wait_for(reader.read(1), timeout=60)
  334. except TimeoutError:
  335. break
  336. if not header:
  337. break
  338. packet_type = (header[0] & 0xF0) >> 4
  339. # Read remaining length
  340. remaining_length = await self._read_remaining_length(reader)
  341. if remaining_length is None:
  342. break
  343. # Read payload
  344. payload = await reader.read(remaining_length) if remaining_length > 0 else b""
  345. # Handle packet types
  346. if packet_type == 1: # CONNECT
  347. authenticated = await self._handle_connect(payload, writer)
  348. if not authenticated:
  349. break
  350. # Register client for periodic status pushes; start with
  351. # self.serial as the fallback until we learn the slicer's
  352. # preferred serial from the first SUBSCRIBE/PUBLISH.
  353. self._clients[client_id] = writer
  354. self._client_serials[client_id] = self.serial
  355. elif packet_type == 3: # PUBLISH
  356. if authenticated:
  357. await self._handle_publish(header[0], payload, writer, client_id)
  358. elif packet_type == 8: # SUBSCRIBE
  359. if authenticated:
  360. await self._handle_subscribe(payload, writer, client_id)
  361. elif packet_type == 12: # PINGREQ
  362. # Send PINGRESP
  363. writer.write(bytes([0xD0, 0x00]))
  364. await writer.drain()
  365. elif packet_type == 14: # DISCONNECT
  366. break
  367. except asyncio.CancelledError:
  368. pass # Expected when server is shutting down and cancels client tasks
  369. except Exception as e:
  370. logger.debug("MQTT client error: %s", e)
  371. finally:
  372. logger.debug("MQTT client disconnected: %s", client_id)
  373. self._clients.pop(client_id, None)
  374. self._client_serials.pop(client_id, None)
  375. try:
  376. writer.close()
  377. await writer.wait_closed()
  378. except OSError:
  379. pass # Best-effort socket cleanup on client disconnect
  380. async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None:
  381. """Read MQTT remaining length (variable byte integer)."""
  382. multiplier = 1
  383. value = 0
  384. for _ in range(4):
  385. try:
  386. byte = await reader.read(1)
  387. if not byte:
  388. return None
  389. encoded = byte[0]
  390. value += (encoded & 127) * multiplier
  391. if (encoded & 128) == 0:
  392. return value
  393. multiplier *= 128
  394. except OSError:
  395. return None
  396. return None
  397. async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> bool:
  398. """Handle MQTT CONNECT packet.
  399. Returns True if authentication successful.
  400. """
  401. try:
  402. # Parse CONNECT packet
  403. # Skip protocol name length and name
  404. idx = 0
  405. proto_len = (payload[idx] << 8) | payload[idx + 1]
  406. idx += 2 + proto_len
  407. # Skip protocol level and connect flags
  408. # connect_flags = payload[idx + 1]
  409. idx += 2
  410. # Skip keepalive
  411. idx += 2
  412. # Read client ID
  413. client_id_len = (payload[idx] << 8) | payload[idx + 1]
  414. idx += 2
  415. # client_id = payload[idx : idx + client_id_len].decode("utf-8")
  416. idx += client_id_len
  417. # Read username
  418. username_len = (payload[idx] << 8) | payload[idx + 1]
  419. idx += 2
  420. username = payload[idx : idx + username_len].decode("utf-8")
  421. idx += username_len
  422. # Read password
  423. password_len = (payload[idx] << 8) | payload[idx + 1]
  424. idx += 2
  425. password = payload[idx : idx + password_len].decode("utf-8")
  426. # Authenticate
  427. if username == "bblp" and password == self.access_code:
  428. # Send CONNACK with success
  429. writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
  430. await writer.drain()
  431. logger.info("%sMQTT client authenticated successfully", self._log_prefix)
  432. # Send immediate status report after auth - slicer expects this
  433. await self._send_status_report(writer)
  434. return True
  435. else:
  436. # Send CONNACK with auth failure
  437. writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
  438. await writer.drain()
  439. logger.warning("%sMQTT auth failed for user '%s' (access code mismatch)", self._log_prefix, username)
  440. return False
  441. except (IndexError, ValueError) as e:
  442. logger.debug("MQTT CONNECT parse error: %s", e)
  443. # Send CONNACK with error
  444. writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error
  445. await writer.drain()
  446. return False
  447. async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
  448. """Handle MQTT SUBSCRIBE packet."""
  449. try:
  450. # Parse packet ID
  451. packet_id = (payload[0] << 8) | payload[1]
  452. # Parse topic filters (just acknowledge them)
  453. idx = 2
  454. granted_qos = []
  455. learned_serial: str | None = None
  456. while idx < len(payload):
  457. topic_len = (payload[idx] << 8) | payload[idx + 1]
  458. idx += 2
  459. topic = payload[idx : idx + topic_len].decode("utf-8")
  460. idx += topic_len
  461. requested_qos = payload[idx]
  462. idx += 1
  463. logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
  464. granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
  465. # Remember the serial the slicer is listening on so status/version
  466. # responses go to a topic it actually subscribed to.
  467. if learned_serial is None:
  468. extracted = self._extract_serial_from_topic(topic)
  469. if extracted:
  470. learned_serial = extracted
  471. if learned_serial and learned_serial != self._client_serials.get(client_id):
  472. if learned_serial != self.serial:
  473. logger.info(
  474. "%sMQTT client subscribed with serial %s (VP serial is %s) — adapting responses",
  475. self._log_prefix,
  476. learned_serial,
  477. self.serial,
  478. )
  479. self._client_serials[client_id] = learned_serial
  480. # Send SUBACK
  481. suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
  482. suback += bytes(granted_qos)
  483. writer.write(suback)
  484. await writer.drain()
  485. # Send initial status report after subscribe on the client's subscribed topic
  486. await self._send_status_report(writer, serial=self._client_serials.get(client_id, self.serial))
  487. except (IndexError, ValueError, OSError) as e:
  488. logger.debug("MQTT SUBSCRIBE error: %s", e)
  489. async def _send_status_report(self, writer: asyncio.StreamWriter, serial: str | None = None) -> None:
  490. """Send a status report to the slicer after connection."""
  491. try:
  492. # Build status message matching Bambu printer format
  493. self._sequence_id += 1
  494. status = {
  495. "print": {
  496. "sequence_id": str(self._sequence_id),
  497. "command": "push_status",
  498. "msg": 0,
  499. "gcode_state": self._gcode_state,
  500. "gcode_file": self._current_file,
  501. "gcode_file_prepare_percent": self._prepare_percent,
  502. "subtask_name": self._current_file.replace(".3mf", "") if self._current_file else "",
  503. "mc_print_stage": "",
  504. "mc_percent": 0,
  505. "mc_remaining_time": 0,
  506. "wifi_signal": "-44dBm",
  507. "print_error": 0,
  508. "print_type": "",
  509. "bed_temper": 25.0,
  510. "bed_target_temper": 0.0,
  511. "nozzle_temper": 25.0,
  512. "nozzle_target_temper": 0.0,
  513. "chamber_temper": 25.0,
  514. "cooling_fan_speed": "0",
  515. "big_fan1_speed": "0",
  516. "big_fan2_speed": "0",
  517. "heatbreak_fan_speed": "0",
  518. "spd_lvl": 1,
  519. "spd_mag": 100,
  520. "stg": [],
  521. "stg_cur": 0,
  522. "layer_num": 0,
  523. "total_layer_num": 0,
  524. "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL)
  525. "hw_switch_state": 0,
  526. "online": {"ahb": False, "rfid": False, "version": 7},
  527. "ams_status": 0,
  528. "sdcard": True,
  529. "storage": {"free": 1000000000, "total": 32000000000},
  530. "upgrade_state": {
  531. "sequence_id": 0,
  532. "progress": "",
  533. "status": "",
  534. "consistency_request": False,
  535. "dis_state": 0,
  536. "err_code": 0,
  537. "force_upgrade": False,
  538. "message": "",
  539. "module": "",
  540. "new_version_state": 2,
  541. "new_ver_list": [],
  542. "ota_new_version_number": "",
  543. "ahb_new_version_number": "",
  544. },
  545. "ipcam": {
  546. "ipcam_dev": "1",
  547. "ipcam_record": "enable",
  548. "timelapse": "disable",
  549. "resolution": "1080p",
  550. "mode_bits": 0,
  551. },
  552. "xcam": {
  553. "allow_skip_parts": False,
  554. "buildplate_marker_detector": True,
  555. "first_layer_inspector": True,
  556. "halt_print_sensitivity": "medium",
  557. "print_halt": True,
  558. "printing_monitor": True,
  559. "spaghetti_detector": True,
  560. },
  561. "lights_report": [{"node": "chamber_light", "mode": "on"}],
  562. "nozzle_diameter": "0.4",
  563. "nozzle_type": "hardened_steel",
  564. }
  565. }
  566. await self._publish_to_report(writer, status, serial or self.serial)
  567. except OSError as e:
  568. logger.error("Failed to send status report: %s", e)
  569. async def _send_version_response(
  570. self, writer: asyncio.StreamWriter, sequence_id: str, serial: str | None = None
  571. ) -> None:
  572. """Send version info response to the slicer."""
  573. try:
  574. product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
  575. # The serial is embedded inside the module[].sn fields *and* used as
  576. # the report topic. Use the client's effective serial so the slicer
  577. # sees internal/topic consistency even when it differs from self.serial.
  578. serial = serial or self.serial
  579. # Build version response matching OrcaSlicer expectations
  580. # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
  581. version_info = {
  582. "info": {
  583. "command": "get_version",
  584. "sequence_id": sequence_id,
  585. "module": [
  586. {
  587. "name": "ota",
  588. "product_name": product_name,
  589. "sw_ver": "01.07.00.00",
  590. "sw_new_ver": "",
  591. "hw_ver": "OTA",
  592. "sn": serial,
  593. "flag": 0,
  594. },
  595. {
  596. "name": "esp32",
  597. "product_name": product_name,
  598. "sw_ver": "01.07.22.25",
  599. "sw_new_ver": "",
  600. "hw_ver": "AP05",
  601. "sn": serial,
  602. "flag": 0,
  603. },
  604. {
  605. "name": "rv1126",
  606. "product_name": product_name,
  607. "sw_ver": "00.00.27.38",
  608. "sw_new_ver": "",
  609. "hw_ver": "AP05",
  610. "sn": serial,
  611. "flag": 0,
  612. },
  613. {
  614. "name": "th",
  615. "product_name": product_name,
  616. "sw_ver": "00.00.04.00",
  617. "sw_new_ver": "",
  618. "hw_ver": "TH07",
  619. "sn": serial,
  620. "flag": 0,
  621. },
  622. {
  623. "name": "mc",
  624. "product_name": product_name,
  625. "sw_ver": "00.00.10.00",
  626. "sw_new_ver": "",
  627. "hw_ver": "MC07",
  628. "sn": serial,
  629. "flag": 0,
  630. },
  631. ],
  632. }
  633. }
  634. await self._publish_to_report(writer, version_info, serial)
  635. logger.info("Sent version response (product_name=%s)", product_name)
  636. except OSError as e:
  637. logger.error("Failed to send version response: %s", e)
  638. def set_gcode_state(self, state: str, filename: str = "", prepare_percent: str = "0") -> None:
  639. """Update the gcode state reported to connected slicers.
  640. Called by the manager to reflect FTP upload progress/completion.
  641. """
  642. self._gcode_state = state
  643. self._current_file = filename
  644. self._prepare_percent = prepare_percent
  645. async def _publish_to_report(self, writer: asyncio.StreamWriter, payload: dict, serial: str = "") -> None:
  646. """Publish a message on the device report topic."""
  647. topic = f"device/{serial or self.serial}/report"
  648. message = json.dumps(payload)
  649. topic_bytes = topic.encode("utf-8")
  650. message_bytes = message.encode("utf-8")
  651. remaining = 2 + len(topic_bytes) + len(message_bytes)
  652. packet = bytes([0x30]) # PUBLISH, QoS 0
  653. while remaining > 0:
  654. byte = remaining % 128
  655. remaining //= 128
  656. if remaining > 0:
  657. byte |= 0x80
  658. packet += bytes([byte])
  659. packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  660. packet += topic_bytes
  661. packet += message_bytes
  662. writer.write(packet)
  663. # Timeout the drain to prevent blocking the event loop if the
  664. # MQTT client stops reading (e.g. slicer busy with FTP upload).
  665. try:
  666. await asyncio.wait_for(writer.drain(), timeout=5)
  667. except TimeoutError:
  668. logger.debug("MQTT drain timeout for %s — client may be busy", topic)
  669. async def _send_print_response(
  670. self, writer: asyncio.StreamWriter, sequence_id: str, filename: str, serial: str | None = None
  671. ) -> None:
  672. """Send project_file acknowledgment matching real Bambu printer behavior."""
  673. # Update state so periodic status pushes reflect preparation
  674. self._gcode_state = "PREPARE"
  675. self._current_file = filename
  676. self._prepare_percent = "0"
  677. try:
  678. # Send command acknowledgment — slicer expects to see
  679. # command: "project_file" echoed back before starting FTP upload
  680. subtask_name = filename.replace(".3mf", "") if filename else ""
  681. response = {
  682. "print": {
  683. "command": "project_file",
  684. "sequence_id": sequence_id,
  685. "param": "Metadata/plate_1.gcode",
  686. "subtask_name": subtask_name,
  687. "gcode_state": "PREPARE",
  688. "gcode_file": filename,
  689. "gcode_file_prepare_percent": "0",
  690. "result": "SUCCESS",
  691. "msg": 0,
  692. }
  693. }
  694. await self._publish_to_report(writer, response, serial or self.serial)
  695. logger.info("Sent project_file acknowledgment for %s", filename)
  696. except OSError as e:
  697. logger.error("Failed to send print response: %s", e)
  698. async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
  699. """Handle MQTT PUBLISH packet."""
  700. try:
  701. # Parse topic
  702. idx = 0
  703. topic_len = (payload[idx] << 8) | payload[idx + 1]
  704. idx += 2
  705. topic = payload[idx : idx + topic_len].decode("utf-8")
  706. idx += topic_len
  707. # Check for packet ID (QoS > 0)
  708. qos = (header & 0x06) >> 1
  709. if qos > 0:
  710. # packet_id = (payload[idx] << 8) | payload[idx + 1]
  711. idx += 2
  712. # Parse message
  713. message = payload[idx:].decode("utf-8")
  714. logger.info("MQTT publish to %s: %s...", topic, message[:100])
  715. # Only handle publishes on *some* device/.../request topic. The
  716. # serial is taken from the topic rather than compared against
  717. # self.serial: the client is already authenticated via the access
  718. # code, and Orca/BambuStudio may have a cached serial that differs
  719. # from the VP's computed self.serial (#927). Use the topic's serial
  720. # for all responses so they land on the topic the slicer subscribed
  721. # to.
  722. if not topic.startswith("device/") or "/request" not in topic:
  723. return
  724. client_serial = self._extract_serial_from_topic(topic) or self.serial
  725. if client_serial and client_serial != self._client_serials.get(client_id):
  726. if client_serial != self.serial:
  727. logger.info(
  728. "%sMQTT client publishing with serial %s (VP serial is %s) — adapting responses",
  729. self._log_prefix,
  730. client_serial,
  731. self.serial,
  732. )
  733. self._client_serials[client_id] = client_serial
  734. try:
  735. data = json.loads(message)
  736. except json.JSONDecodeError:
  737. return # Non-JSON payloads on request topic are safely ignored
  738. # Handle pushing command (status request)
  739. if "pushing" in data:
  740. pushing_data = data["pushing"]
  741. command = pushing_data.get("command", "")
  742. logger.info("MQTT pushing command: %s", command)
  743. if command == "pushall":
  744. # Slicer is requesting full status - send response
  745. logger.info("Sending status report in response to pushall")
  746. await self._send_status_report(writer, serial=client_serial)
  747. elif command == "start":
  748. # Slicer wants periodic status updates - send one now
  749. logger.info("Starting status push stream")
  750. await self._send_status_report(writer, serial=client_serial)
  751. # Handle info commands (get_version, etc.)
  752. if "info" in data:
  753. info_data = data["info"]
  754. command = info_data.get("command", "")
  755. sequence_id = info_data.get("sequence_id", "0")
  756. logger.info("MQTT info command: %s", command)
  757. if command == "get_version":
  758. await self._send_version_response(writer, sequence_id, serial=client_serial)
  759. # Handle print commands
  760. if "print" in data:
  761. print_data = data["print"]
  762. command = print_data.get("command", "")
  763. filename = print_data.get("subtask_name", "")
  764. sequence_id = print_data.get("sequence_id", "0")
  765. logger.info("MQTT print command: %s for %s", command, filename)
  766. if command == "project_file":
  767. # Respond with PREPARE status so slicer proceeds with FTP upload
  768. file_3mf = print_data.get("file", filename)
  769. await self._send_print_response(writer, sequence_id, file_3mf, serial=client_serial)
  770. if self.on_print_command:
  771. await self._notify_print_command(filename, print_data)
  772. except (IndexError, ValueError, OSError) as e:
  773. logger.debug("MQTT PUBLISH error: %s", e)
  774. async def _notify_print_command(self, filename: str, data: dict) -> None:
  775. """Notify callback of print command."""
  776. if self.on_print_command:
  777. try:
  778. result = self.on_print_command(filename, data)
  779. if asyncio.iscoroutine(result):
  780. await result
  781. except Exception as e:
  782. logger.error("Print command callback error: %s", e)