mqtt_server.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839
  1. """MQTT broker for virtual printer.
  2. Implements an MQTT broker that accepts connections from slicers,
  3. authenticates with the configured access code, and logs print commands.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. from collections.abc import Callable
  10. from pathlib import Path
  11. logger = logging.getLogger(__name__)
  12. # Default MQTT port for Bambu printers (MQTT over TLS)
  13. MQTT_PORT = 8883
  14. # Model code → product_name for version response (must match what slicer expects)
  15. MODEL_PRODUCT_NAMES = {
  16. "3DPrinter-X1-Carbon": "X1 Carbon",
  17. "3DPrinter-X1": "X1",
  18. "C13": "X1E",
  19. "C11": "P1P",
  20. "C12": "P1S",
  21. "N7": "P2S",
  22. "N2S": "A1",
  23. "N1": "A1 mini",
  24. "O1D": "H2D",
  25. "O1C": "H2C",
  26. "O1S": "H2S",
  27. }
  28. class VirtualPrinterMQTTServer:
  29. """MQTT broker that accepts connections from slicers.
  30. This is a minimal MQTT broker implementation that:
  31. - Accepts TLS connections on port 8883
  32. - Authenticates with username 'bblp' and the configured access code
  33. - Receives print commands on device/{serial}/request
  34. - Can publish status on device/{serial}/report
  35. """
  36. def __init__(
  37. self,
  38. serial: str,
  39. access_code: str,
  40. cert_path: Path,
  41. key_path: Path,
  42. port: int = MQTT_PORT,
  43. on_print_command: Callable[[str, dict], None] | None = None,
  44. ):
  45. """Initialize the MQTT server.
  46. Args:
  47. serial: Virtual printer serial number
  48. access_code: Password for authentication
  49. cert_path: Path to TLS certificate
  50. key_path: Path to TLS private key
  51. port: Port to listen on (default 8883)
  52. on_print_command: Callback when print command received (filename, data)
  53. """
  54. self.serial = serial
  55. self.access_code = access_code
  56. self.cert_path = cert_path
  57. self.key_path = key_path
  58. self.port = port
  59. self.on_print_command = on_print_command
  60. self._running = False
  61. self._broker = None
  62. self._broker_task = None
  63. async def start(self) -> None:
  64. """Start the MQTT broker."""
  65. if self._running:
  66. return
  67. # Try to import amqtt
  68. try:
  69. from amqtt.broker import Broker
  70. except ImportError:
  71. logger.error("amqtt not installed. Run: pip install amqtt")
  72. return
  73. logger.info("Starting virtual printer MQTT broker on port %s", self.port)
  74. # Build broker configuration
  75. config = {
  76. "listeners": {
  77. "default": {
  78. "type": "tcp",
  79. "bind": f"0.0.0.0:{self.port}",
  80. "ssl": "on",
  81. "certfile": str(self.cert_path),
  82. "keyfile": str(self.key_path),
  83. },
  84. },
  85. "auth": {
  86. "allow-anonymous": False,
  87. "plugins": ["auth_custom"],
  88. },
  89. "topic-check": {
  90. "enabled": False, # Allow any topic
  91. },
  92. }
  93. try:
  94. self._running = True
  95. # Create and start broker
  96. self._broker = Broker(config)
  97. # Register custom auth plugin
  98. self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate
  99. # Start the broker
  100. await self._broker.start()
  101. logger.info("MQTT broker started on port %s", self.port)
  102. # Keep running
  103. while self._running:
  104. await asyncio.sleep(1)
  105. except OSError as e:
  106. if e.errno == 98: # Address already in use
  107. logger.error("MQTT port %s is already in use", self.port)
  108. else:
  109. logger.error("MQTT broker error: %s", e)
  110. except asyncio.CancelledError:
  111. logger.debug("MQTT broker task cancelled")
  112. except Exception as e:
  113. logger.error("MQTT broker error: %s", e)
  114. finally:
  115. await self.stop()
  116. async def _authenticate(self, session) -> bool:
  117. """Authenticate MQTT connection.
  118. Args:
  119. session: MQTT session with username/password
  120. Returns:
  121. True if authentication successful
  122. """
  123. username = getattr(session, "username", None)
  124. password = getattr(session, "password", None)
  125. # Bambu slicers use 'bblp' as username and access code as password
  126. if username == "bblp" and password == self.access_code:
  127. logger.debug("MQTT client authenticated from %s", session.remote_address)
  128. return True
  129. logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address)
  130. return False
  131. async def stop(self) -> None:
  132. """Stop the MQTT broker."""
  133. logger.info("Stopping MQTT broker")
  134. self._running = False
  135. if self._broker:
  136. try:
  137. await self._broker.shutdown()
  138. except OSError as e:
  139. logger.debug("Error shutting down MQTT broker: %s", e)
  140. self._broker = None
  141. class SimpleMQTTServer:
  142. """Simplified MQTT server using raw sockets.
  143. This is a fallback implementation that handles basic MQTT protocol
  144. without requiring the amqtt library. It's less feature-complete but
  145. more lightweight.
  146. """
  147. def __init__(
  148. self,
  149. serial: str,
  150. access_code: str,
  151. cert_path: Path,
  152. key_path: Path,
  153. port: int = MQTT_PORT,
  154. on_print_command: Callable[[str, dict], None] | None = None,
  155. model: str = "",
  156. bind_address: str = "0.0.0.0", # nosec B104
  157. ):
  158. self.serial = serial
  159. self.access_code = access_code
  160. self.model = model
  161. self.cert_path = cert_path
  162. self.key_path = key_path
  163. self.port = port
  164. self.on_print_command = on_print_command
  165. self.bind_address = bind_address
  166. self._running = False
  167. self._server = None
  168. self._clients: dict[str, asyncio.StreamWriter] = {}
  169. self._status_push_task: asyncio.Task | None = None
  170. self._sequence_id = 0
  171. # Dynamic state for status reports
  172. self._gcode_state = "IDLE"
  173. self._current_file = ""
  174. self._prepare_percent = "0"
  175. async def start(self) -> None:
  176. """Start the MQTT server."""
  177. if self._running:
  178. return
  179. logger.info("Starting simple MQTT server on port %s", self.port)
  180. # Create SSL context with Bambu-compatible settings
  181. ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  182. ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
  183. # Match Bambu printer behavior - accept any client
  184. ssl_context.verify_mode = ssl.CERT_NONE
  185. # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3)
  186. ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
  187. # Disable hostname checking
  188. ssl_context.check_hostname = False
  189. # Log certificate info
  190. import subprocess
  191. try:
  192. result = subprocess.run(
  193. ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"],
  194. capture_output=True,
  195. text=True,
  196. timeout=5,
  197. )
  198. logger.info("MQTT SSL cert info: %s", result.stdout.strip())
  199. except (OSError, subprocess.SubprocessError):
  200. pass # Certificate info is for debug logging only; not critical
  201. logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path)
  202. try:
  203. self._running = True
  204. # Wrapper to log ALL connection attempts including SSL errors
  205. async def connection_handler(reader, writer):
  206. try:
  207. addr = writer.get_extra_info("peername")
  208. ssl_obj = writer.get_extra_info("ssl_object")
  209. if ssl_obj:
  210. logger.info(
  211. f"MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}"
  212. )
  213. else:
  214. logger.info("MQTT connection from %s (no TLS?)", addr)
  215. await self._handle_client(reader, writer)
  216. except ssl.SSLError as e:
  217. logger.error("MQTT SSL error: %s", e)
  218. except Exception as e:
  219. logger.error("MQTT connection handler error: %s", e)
  220. # Custom protocol factory to log raw connection attempts
  221. logger.info("Setting up MQTT server with SSL error handling...")
  222. # Add SSL handshake error callback
  223. def handle_ssl_error(loop, context):
  224. exception = context.get("exception")
  225. message = context.get("message", "")
  226. if "ssl" in str(exception).lower() or "ssl" in message.lower():
  227. logger.error("SSL error: %s - %s", message, exception)
  228. else:
  229. logger.debug("Asyncio error: %s", message)
  230. asyncio.get_event_loop().set_exception_handler(handle_ssl_error)
  231. self._server = await asyncio.start_server(
  232. connection_handler,
  233. self.bind_address,
  234. self.port,
  235. ssl=ssl_context,
  236. )
  237. logger.info("Simple MQTT server listening on port %s", self.port)
  238. # Start periodic status push task
  239. self._status_push_task = asyncio.create_task(self._periodic_status_push())
  240. async with self._server:
  241. await self._server.serve_forever()
  242. except OSError as e:
  243. if e.errno == 98: # Address already in use
  244. logger.error("MQTT port %s is already in use", self.port)
  245. else:
  246. logger.error("MQTT server error: %s", e)
  247. except asyncio.CancelledError:
  248. logger.debug("MQTT server task cancelled")
  249. except Exception as e:
  250. logger.error("MQTT server error: %s", e)
  251. finally:
  252. await self.stop()
  253. async def stop(self) -> None:
  254. """Stop the MQTT server."""
  255. logger.info("Stopping simple MQTT server")
  256. self._running = False
  257. # Stop periodic status push
  258. if self._status_push_task:
  259. self._status_push_task.cancel()
  260. try:
  261. await self._status_push_task
  262. except asyncio.CancelledError:
  263. pass # Expected when stopping the periodic status push task
  264. self._status_push_task = None
  265. # Close all client connections (iterate over copy to avoid modification during iteration)
  266. for _client_id, writer in list(self._clients.items()):
  267. try:
  268. writer.close()
  269. await writer.wait_closed()
  270. except OSError:
  271. pass # Best-effort client connection cleanup; client may have disconnected
  272. self._clients.clear()
  273. if self._server:
  274. try:
  275. self._server.close()
  276. await self._server.wait_closed()
  277. except OSError:
  278. pass # Best-effort server shutdown; port may already be released
  279. self._server = None
  280. async def _periodic_status_push(self) -> None:
  281. """Send periodic status updates to all connected clients."""
  282. logger.info("Starting periodic status push task")
  283. while self._running:
  284. try:
  285. await asyncio.sleep(1) # Push every 1 second like real printers
  286. # Send status to all connected clients
  287. disconnected = []
  288. for client_id, writer in list(self._clients.items()):
  289. try:
  290. if writer.is_closing():
  291. disconnected.append(client_id)
  292. continue
  293. await self._send_status_report(writer)
  294. except OSError as e:
  295. logger.debug("Failed to push status to %s: %s", client_id, e)
  296. disconnected.append(client_id)
  297. # Remove disconnected clients
  298. for client_id in disconnected:
  299. self._clients.pop(client_id, None)
  300. except asyncio.CancelledError:
  301. break
  302. except Exception as e:
  303. logger.error("Periodic status push error: %s", e)
  304. logger.info("Periodic status push task stopped")
  305. async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
  306. """Handle an MQTT client connection."""
  307. addr = writer.get_extra_info("peername")
  308. client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
  309. logger.info("MQTT client connected: %s", client_id)
  310. authenticated = False
  311. try:
  312. while self._running:
  313. # Read MQTT fixed header
  314. try:
  315. header = await asyncio.wait_for(reader.read(1), timeout=60)
  316. except TimeoutError:
  317. break
  318. if not header:
  319. break
  320. packet_type = (header[0] & 0xF0) >> 4
  321. # Read remaining length
  322. remaining_length = await self._read_remaining_length(reader)
  323. if remaining_length is None:
  324. break
  325. # Read payload
  326. payload = await reader.read(remaining_length) if remaining_length > 0 else b""
  327. # Handle packet types
  328. if packet_type == 1: # CONNECT
  329. authenticated = await self._handle_connect(payload, writer)
  330. if not authenticated:
  331. break
  332. # Register client for periodic status pushes
  333. self._clients[client_id] = writer
  334. elif packet_type == 3: # PUBLISH
  335. if authenticated:
  336. await self._handle_publish(header[0], payload, writer)
  337. elif packet_type == 8: # SUBSCRIBE
  338. if authenticated:
  339. await self._handle_subscribe(payload, writer)
  340. elif packet_type == 12: # PINGREQ
  341. # Send PINGRESP
  342. writer.write(bytes([0xD0, 0x00]))
  343. await writer.drain()
  344. elif packet_type == 14: # DISCONNECT
  345. break
  346. except asyncio.CancelledError:
  347. pass # Expected when server is shutting down and cancels client tasks
  348. except Exception as e:
  349. logger.debug("MQTT client error: %s", e)
  350. finally:
  351. logger.debug("MQTT client disconnected: %s", client_id)
  352. if client_id in self._clients:
  353. del self._clients[client_id]
  354. try:
  355. writer.close()
  356. await writer.wait_closed()
  357. except OSError:
  358. pass # Best-effort socket cleanup on client disconnect
  359. async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None:
  360. """Read MQTT remaining length (variable byte integer)."""
  361. multiplier = 1
  362. value = 0
  363. for _ in range(4):
  364. try:
  365. byte = await reader.read(1)
  366. if not byte:
  367. return None
  368. encoded = byte[0]
  369. value += (encoded & 127) * multiplier
  370. if (encoded & 128) == 0:
  371. return value
  372. multiplier *= 128
  373. except OSError:
  374. return None
  375. return None
  376. async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> bool:
  377. """Handle MQTT CONNECT packet.
  378. Returns True if authentication successful.
  379. """
  380. try:
  381. # Parse CONNECT packet
  382. # Skip protocol name length and name
  383. idx = 0
  384. proto_len = (payload[idx] << 8) | payload[idx + 1]
  385. idx += 2 + proto_len
  386. # Skip protocol level and connect flags
  387. # connect_flags = payload[idx + 1]
  388. idx += 2
  389. # Skip keepalive
  390. idx += 2
  391. # Read client ID
  392. client_id_len = (payload[idx] << 8) | payload[idx + 1]
  393. idx += 2
  394. # client_id = payload[idx : idx + client_id_len].decode("utf-8")
  395. idx += client_id_len
  396. # Read username
  397. username_len = (payload[idx] << 8) | payload[idx + 1]
  398. idx += 2
  399. username = payload[idx : idx + username_len].decode("utf-8")
  400. idx += username_len
  401. # Read password
  402. password_len = (payload[idx] << 8) | payload[idx + 1]
  403. idx += 2
  404. password = payload[idx : idx + password_len].decode("utf-8")
  405. # Authenticate
  406. if username == "bblp" and password == self.access_code:
  407. # Send CONNACK with success
  408. writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
  409. await writer.drain()
  410. logger.info("MQTT client authenticated successfully")
  411. # Send immediate status report after auth - slicer expects this
  412. await self._send_status_report(writer)
  413. return True
  414. else:
  415. # Send CONNACK with auth failure
  416. writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
  417. await writer.drain()
  418. logger.warning("MQTT auth failed for user '%s'", username)
  419. return False
  420. except (IndexError, ValueError) as e:
  421. logger.debug("MQTT CONNECT parse error: %s", e)
  422. # Send CONNACK with error
  423. writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error
  424. await writer.drain()
  425. return False
  426. async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter) -> None:
  427. """Handle MQTT SUBSCRIBE packet."""
  428. try:
  429. # Parse packet ID
  430. packet_id = (payload[0] << 8) | payload[1]
  431. # Parse topic filters (just acknowledge them)
  432. idx = 2
  433. granted_qos = []
  434. while idx < len(payload):
  435. topic_len = (payload[idx] << 8) | payload[idx + 1]
  436. idx += 2
  437. topic = payload[idx : idx + topic_len].decode("utf-8")
  438. idx += topic_len
  439. requested_qos = payload[idx]
  440. idx += 1
  441. logger.info("MQTT subscribe: %s QoS=%s", topic, requested_qos)
  442. granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
  443. # Send SUBACK
  444. suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
  445. suback += bytes(granted_qos)
  446. writer.write(suback)
  447. await writer.drain()
  448. # Send initial status report after subscribe
  449. await self._send_status_report(writer)
  450. except (IndexError, ValueError, OSError) as e:
  451. logger.debug("MQTT SUBSCRIBE error: %s", e)
  452. async def _send_status_report(self, writer: asyncio.StreamWriter) -> None:
  453. """Send a status report to the slicer after connection."""
  454. try:
  455. # Build status message matching Bambu printer format
  456. self._sequence_id += 1
  457. status = {
  458. "print": {
  459. "sequence_id": str(self._sequence_id),
  460. "command": "push_status",
  461. "msg": 0,
  462. "gcode_state": self._gcode_state,
  463. "gcode_file": self._current_file,
  464. "gcode_file_prepare_percent": self._prepare_percent,
  465. "subtask_name": self._current_file.replace(".3mf", "") if self._current_file else "",
  466. "mc_print_stage": "",
  467. "mc_percent": 0,
  468. "mc_remaining_time": 0,
  469. "wifi_signal": "-44dBm",
  470. "print_error": 0,
  471. "print_type": "",
  472. "bed_temper": 25.0,
  473. "bed_target_temper": 0.0,
  474. "nozzle_temper": 25.0,
  475. "nozzle_target_temper": 0.0,
  476. "chamber_temper": 25.0,
  477. "cooling_fan_speed": "0",
  478. "big_fan1_speed": "0",
  479. "big_fan2_speed": "0",
  480. "heatbreak_fan_speed": "0",
  481. "spd_lvl": 1,
  482. "spd_mag": 100,
  483. "stg": [],
  484. "stg_cur": 0,
  485. "layer_num": 0,
  486. "total_layer_num": 0,
  487. "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL)
  488. "hw_switch_state": 0,
  489. "online": {"ahb": False, "rfid": False, "version": 7},
  490. "ams_status": 0,
  491. "sdcard": True,
  492. "storage": {"free": 1000000000, "total": 32000000000},
  493. "upgrade_state": {
  494. "sequence_id": 0,
  495. "progress": "",
  496. "status": "",
  497. "consistency_request": False,
  498. "dis_state": 0,
  499. "err_code": 0,
  500. "force_upgrade": False,
  501. "message": "",
  502. "module": "",
  503. "new_version_state": 2,
  504. "new_ver_list": [],
  505. "ota_new_version_number": "",
  506. "ahb_new_version_number": "",
  507. },
  508. "ipcam": {
  509. "ipcam_dev": "1",
  510. "ipcam_record": "enable",
  511. "timelapse": "disable",
  512. "resolution": "1080p",
  513. "mode_bits": 0,
  514. },
  515. "xcam": {
  516. "allow_skip_parts": False,
  517. "buildplate_marker_detector": True,
  518. "first_layer_inspector": True,
  519. "halt_print_sensitivity": "medium",
  520. "print_halt": True,
  521. "printing_monitor": True,
  522. "spaghetti_detector": True,
  523. },
  524. "lights_report": [{"node": "chamber_light", "mode": "on"}],
  525. "nozzle_diameter": "0.4",
  526. "nozzle_type": "hardened_steel",
  527. }
  528. }
  529. await self._publish_to_report(writer, status, self.serial)
  530. except OSError as e:
  531. logger.error("Failed to send status report: %s", e)
  532. async def _send_version_response(self, writer: asyncio.StreamWriter, sequence_id: str) -> None:
  533. """Send version info response to the slicer."""
  534. try:
  535. product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
  536. serial = self.serial
  537. # Build version response matching OrcaSlicer expectations
  538. # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
  539. version_info = {
  540. "info": {
  541. "command": "get_version",
  542. "sequence_id": sequence_id,
  543. "module": [
  544. {
  545. "name": "ota",
  546. "product_name": product_name,
  547. "sw_ver": "01.07.00.00",
  548. "sw_new_ver": "",
  549. "hw_ver": "OTA",
  550. "sn": serial,
  551. "flag": 0,
  552. },
  553. {
  554. "name": "esp32",
  555. "product_name": product_name,
  556. "sw_ver": "01.07.22.25",
  557. "sw_new_ver": "",
  558. "hw_ver": "AP05",
  559. "sn": serial,
  560. "flag": 0,
  561. },
  562. {
  563. "name": "rv1126",
  564. "product_name": product_name,
  565. "sw_ver": "00.00.27.38",
  566. "sw_new_ver": "",
  567. "hw_ver": "AP05",
  568. "sn": serial,
  569. "flag": 0,
  570. },
  571. {
  572. "name": "th",
  573. "product_name": product_name,
  574. "sw_ver": "00.00.04.00",
  575. "sw_new_ver": "",
  576. "hw_ver": "TH07",
  577. "sn": serial,
  578. "flag": 0,
  579. },
  580. {
  581. "name": "mc",
  582. "product_name": product_name,
  583. "sw_ver": "00.00.10.00",
  584. "sw_new_ver": "",
  585. "hw_ver": "MC07",
  586. "sn": serial,
  587. "flag": 0,
  588. },
  589. ],
  590. }
  591. }
  592. await self._publish_to_report(writer, version_info, serial)
  593. logger.info("Sent version response (product_name=%s)", product_name)
  594. except OSError as e:
  595. logger.error("Failed to send version response: %s", e)
  596. def set_gcode_state(self, state: str, filename: str = "", prepare_percent: str = "0") -> None:
  597. """Update the gcode state reported to connected slicers.
  598. Called by the manager to reflect FTP upload progress/completion.
  599. """
  600. self._gcode_state = state
  601. self._current_file = filename
  602. self._prepare_percent = prepare_percent
  603. async def _publish_to_report(self, writer: asyncio.StreamWriter, payload: dict, serial: str = "") -> None:
  604. """Publish a message on the device report topic."""
  605. topic = f"device/{serial or self.serial}/report"
  606. message = json.dumps(payload)
  607. topic_bytes = topic.encode("utf-8")
  608. message_bytes = message.encode("utf-8")
  609. remaining = 2 + len(topic_bytes) + len(message_bytes)
  610. packet = bytes([0x30]) # PUBLISH, QoS 0
  611. while remaining > 0:
  612. byte = remaining % 128
  613. remaining //= 128
  614. if remaining > 0:
  615. byte |= 0x80
  616. packet += bytes([byte])
  617. packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  618. packet += topic_bytes
  619. packet += message_bytes
  620. writer.write(packet)
  621. # Timeout the drain to prevent blocking the event loop if the
  622. # MQTT client stops reading (e.g. slicer busy with FTP upload).
  623. try:
  624. await asyncio.wait_for(writer.drain(), timeout=5)
  625. except TimeoutError:
  626. logger.debug("MQTT drain timeout for %s — client may be busy", topic)
  627. async def _send_print_response(self, writer: asyncio.StreamWriter, sequence_id: str, filename: str) -> None:
  628. """Send project_file acknowledgment matching real Bambu printer behavior."""
  629. # Update state so periodic status pushes reflect preparation
  630. self._gcode_state = "PREPARE"
  631. self._current_file = filename
  632. self._prepare_percent = "0"
  633. try:
  634. # Send command acknowledgment — slicer expects to see
  635. # command: "project_file" echoed back before starting FTP upload
  636. subtask_name = filename.replace(".3mf", "") if filename else ""
  637. response = {
  638. "print": {
  639. "command": "project_file",
  640. "sequence_id": sequence_id,
  641. "param": "Metadata/plate_1.gcode",
  642. "subtask_name": subtask_name,
  643. "gcode_state": "PREPARE",
  644. "gcode_file": filename,
  645. "gcode_file_prepare_percent": "0",
  646. "result": "SUCCESS",
  647. "msg": 0,
  648. }
  649. }
  650. await self._publish_to_report(writer, response)
  651. logger.info("Sent project_file acknowledgment for %s", filename)
  652. except OSError as e:
  653. logger.error("Failed to send print response: %s", e)
  654. async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter) -> None:
  655. """Handle MQTT PUBLISH packet."""
  656. try:
  657. # Parse topic
  658. idx = 0
  659. topic_len = (payload[idx] << 8) | payload[idx + 1]
  660. idx += 2
  661. topic = payload[idx : idx + topic_len].decode("utf-8")
  662. idx += topic_len
  663. # Check for packet ID (QoS > 0)
  664. qos = (header & 0x06) >> 1
  665. if qos > 0:
  666. # packet_id = (payload[idx] << 8) | payload[idx + 1]
  667. idx += 2
  668. # Parse message
  669. message = payload[idx:].decode("utf-8")
  670. logger.info("MQTT publish to %s: %s...", topic, message[:100])
  671. # Handle commands on device request topic
  672. if f"device/{self.serial}/request" in topic:
  673. try:
  674. data = json.loads(message)
  675. # Handle pushing command (status request)
  676. if "pushing" in data:
  677. pushing_data = data["pushing"]
  678. command = pushing_data.get("command", "")
  679. logger.info("MQTT pushing command: %s", command)
  680. if command == "pushall":
  681. # Slicer is requesting full status - send response
  682. logger.info("Sending status report in response to pushall")
  683. await self._send_status_report(writer)
  684. elif command == "start":
  685. # Slicer wants periodic status updates - send one now
  686. logger.info("Starting status push stream")
  687. await self._send_status_report(writer)
  688. # Handle info commands (get_version, etc.)
  689. if "info" in data:
  690. info_data = data["info"]
  691. command = info_data.get("command", "")
  692. sequence_id = info_data.get("sequence_id", "0")
  693. logger.info("MQTT info command: %s", command)
  694. if command == "get_version":
  695. await self._send_version_response(writer, sequence_id)
  696. # Handle print commands
  697. if "print" in data:
  698. print_data = data["print"]
  699. command = print_data.get("command", "")
  700. filename = print_data.get("subtask_name", "")
  701. sequence_id = print_data.get("sequence_id", "0")
  702. logger.info("MQTT print command: %s for %s", command, filename)
  703. if command == "project_file":
  704. # Respond with PREPARE status so slicer proceeds with FTP upload
  705. file_3mf = print_data.get("file", filename)
  706. await self._send_print_response(writer, sequence_id, file_3mf)
  707. if self.on_print_command:
  708. await self._notify_print_command(filename, print_data)
  709. except json.JSONDecodeError:
  710. pass # Non-JSON payloads on request topic are safely ignored
  711. except (IndexError, ValueError, OSError) as e:
  712. logger.debug("MQTT PUBLISH error: %s", e)
  713. async def _notify_print_command(self, filename: str, data: dict) -> None:
  714. """Notify callback of print command."""
  715. if self.on_print_command:
  716. try:
  717. result = self.on_print_command(filename, data)
  718. if asyncio.iscoroutine(result):
  719. await result
  720. except Exception as e:
  721. logger.error("Print command callback error: %s", e)