mqtt_server.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799
  1. """MQTT broker for virtual printer.
  2. Implements an MQTT broker that accepts connections from slicers,
  3. authenticates with the configured access code, and logs print commands.
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. import ssl
  9. from collections.abc import Callable
  10. from pathlib import Path
  11. logger = logging.getLogger(__name__)
  12. # Default MQTT port for Bambu printers (MQTT over TLS)
  13. MQTT_PORT = 8883
  14. class VirtualPrinterMQTTServer:
  15. """MQTT broker that accepts connections from slicers.
  16. This is a minimal MQTT broker implementation that:
  17. - Accepts TLS connections on port 8883
  18. - Authenticates with username 'bblp' and the configured access code
  19. - Receives print commands on device/{serial}/request
  20. - Can publish status on device/{serial}/report
  21. """
  22. def __init__(
  23. self,
  24. serial: str,
  25. access_code: str,
  26. cert_path: Path,
  27. key_path: Path,
  28. port: int = MQTT_PORT,
  29. on_print_command: Callable[[str, dict], None] | None = None,
  30. ):
  31. """Initialize the MQTT server.
  32. Args:
  33. serial: Virtual printer serial number
  34. access_code: Password for authentication
  35. cert_path: Path to TLS certificate
  36. key_path: Path to TLS private key
  37. port: Port to listen on (default 8883)
  38. on_print_command: Callback when print command received (filename, data)
  39. """
  40. self.serial = serial
  41. self.access_code = access_code
  42. self.cert_path = cert_path
  43. self.key_path = key_path
  44. self.port = port
  45. self.on_print_command = on_print_command
  46. self._running = False
  47. self._broker = None
  48. self._broker_task = None
  49. async def start(self) -> None:
  50. """Start the MQTT broker."""
  51. if self._running:
  52. return
  53. # Try to import amqtt
  54. try:
  55. from amqtt.broker import Broker
  56. except ImportError:
  57. logger.error("amqtt not installed. Run: pip install amqtt")
  58. return
  59. logger.info("Starting virtual printer MQTT broker on port %s", self.port)
  60. # Build broker configuration
  61. config = {
  62. "listeners": {
  63. "default": {
  64. "type": "tcp",
  65. "bind": f"0.0.0.0:{self.port}",
  66. "ssl": "on",
  67. "certfile": str(self.cert_path),
  68. "keyfile": str(self.key_path),
  69. },
  70. },
  71. "auth": {
  72. "allow-anonymous": False,
  73. "plugins": ["auth_custom"],
  74. },
  75. "topic-check": {
  76. "enabled": False, # Allow any topic
  77. },
  78. }
  79. try:
  80. self._running = True
  81. # Create and start broker
  82. self._broker = Broker(config)
  83. # Register custom auth plugin
  84. self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate
  85. # Start the broker
  86. await self._broker.start()
  87. logger.info("MQTT broker started on port %s", self.port)
  88. # Keep running
  89. while self._running:
  90. await asyncio.sleep(1)
  91. except OSError as e:
  92. if e.errno == 98: # Address already in use
  93. logger.error("MQTT port %s is already in use", self.port)
  94. else:
  95. logger.error("MQTT broker error: %s", e)
  96. except asyncio.CancelledError:
  97. logger.debug("MQTT broker task cancelled")
  98. except Exception as e:
  99. logger.error("MQTT broker error: %s", e)
  100. finally:
  101. await self.stop()
  102. async def _authenticate(self, session) -> bool:
  103. """Authenticate MQTT connection.
  104. Args:
  105. session: MQTT session with username/password
  106. Returns:
  107. True if authentication successful
  108. """
  109. username = getattr(session, "username", None)
  110. password = getattr(session, "password", None)
  111. # Bambu slicers use 'bblp' as username and access code as password
  112. if username == "bblp" and password == self.access_code:
  113. logger.debug("MQTT client authenticated from %s", session.remote_address)
  114. return True
  115. logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address)
  116. return False
  117. async def stop(self) -> None:
  118. """Stop the MQTT broker."""
  119. logger.info("Stopping MQTT broker")
  120. self._running = False
  121. if self._broker:
  122. try:
  123. await self._broker.shutdown()
  124. except OSError as e:
  125. logger.debug("Error shutting down MQTT broker: %s", e)
  126. self._broker = None
  127. class SimpleMQTTServer:
  128. """Simplified MQTT server using raw sockets.
  129. This is a fallback implementation that handles basic MQTT protocol
  130. without requiring the amqtt library. It's less feature-complete but
  131. more lightweight.
  132. """
  133. def __init__(
  134. self,
  135. serial: str,
  136. access_code: str,
  137. cert_path: Path,
  138. key_path: Path,
  139. port: int = MQTT_PORT,
  140. on_print_command: Callable[[str, dict], None] | None = None,
  141. ):
  142. self.serial = serial
  143. self.access_code = access_code
  144. self.cert_path = cert_path
  145. self.key_path = key_path
  146. self.port = port
  147. self.on_print_command = on_print_command
  148. self._running = False
  149. self._server = None
  150. self._clients: dict[str, asyncio.StreamWriter] = {}
  151. self._status_push_task: asyncio.Task | None = None
  152. self._sequence_id = 0
  153. async def start(self) -> None:
  154. """Start the MQTT server."""
  155. if self._running:
  156. return
  157. logger.info("Starting simple MQTT server on port %s", self.port)
  158. # Create SSL context with Bambu-compatible settings
  159. ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  160. ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
  161. # Match Bambu printer behavior - accept any client
  162. ssl_context.verify_mode = ssl.CERT_NONE
  163. # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3)
  164. ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
  165. # Disable hostname checking
  166. ssl_context.check_hostname = False
  167. # Log certificate info
  168. import subprocess
  169. try:
  170. result = subprocess.run(
  171. ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"],
  172. capture_output=True,
  173. text=True,
  174. timeout=5,
  175. )
  176. logger.info("MQTT SSL cert info: %s", result.stdout.strip())
  177. except (OSError, subprocess.SubprocessError):
  178. pass
  179. logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path)
  180. try:
  181. self._running = True
  182. # Wrapper to log ALL connection attempts including SSL errors
  183. async def connection_handler(reader, writer):
  184. try:
  185. addr = writer.get_extra_info("peername")
  186. ssl_obj = writer.get_extra_info("ssl_object")
  187. if ssl_obj:
  188. logger.info(
  189. f"MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}"
  190. )
  191. else:
  192. logger.info("MQTT connection from %s (no TLS?)", addr)
  193. await self._handle_client(reader, writer)
  194. except ssl.SSLError as e:
  195. logger.error("MQTT SSL error: %s", e)
  196. except Exception as e:
  197. logger.error("MQTT connection handler error: %s", e)
  198. # Custom protocol factory to log raw connection attempts
  199. logger.info("Setting up MQTT server with SSL error handling...")
  200. # Add SSL handshake error callback
  201. def handle_ssl_error(loop, context):
  202. exception = context.get("exception")
  203. message = context.get("message", "")
  204. if "ssl" in str(exception).lower() or "ssl" in message.lower():
  205. logger.error("SSL error: %s - %s", message, exception)
  206. else:
  207. logger.debug("Asyncio error: %s", message)
  208. asyncio.get_event_loop().set_exception_handler(handle_ssl_error)
  209. self._server = await asyncio.start_server(
  210. connection_handler,
  211. "0.0.0.0",
  212. self.port,
  213. ssl=ssl_context,
  214. )
  215. logger.info("Simple MQTT server listening on port %s", self.port)
  216. # Start periodic status push task
  217. self._status_push_task = asyncio.create_task(self._periodic_status_push())
  218. async with self._server:
  219. await self._server.serve_forever()
  220. except OSError as e:
  221. if e.errno == 98: # Address already in use
  222. logger.error("MQTT port %s is already in use", self.port)
  223. else:
  224. logger.error("MQTT server error: %s", e)
  225. except asyncio.CancelledError:
  226. logger.debug("MQTT server task cancelled")
  227. except Exception as e:
  228. logger.error("MQTT server error: %s", e)
  229. finally:
  230. await self.stop()
  231. async def stop(self) -> None:
  232. """Stop the MQTT server."""
  233. logger.info("Stopping simple MQTT server")
  234. self._running = False
  235. # Stop periodic status push
  236. if self._status_push_task:
  237. self._status_push_task.cancel()
  238. try:
  239. await self._status_push_task
  240. except asyncio.CancelledError:
  241. pass
  242. self._status_push_task = None
  243. # Close all client connections (iterate over copy to avoid modification during iteration)
  244. for _client_id, writer in list(self._clients.items()):
  245. try:
  246. writer.close()
  247. await writer.wait_closed()
  248. except OSError:
  249. pass
  250. self._clients.clear()
  251. if self._server:
  252. try:
  253. self._server.close()
  254. await self._server.wait_closed()
  255. except OSError:
  256. pass
  257. self._server = None
  258. async def _periodic_status_push(self) -> None:
  259. """Send periodic status updates to all connected clients."""
  260. logger.info("Starting periodic status push task")
  261. while self._running:
  262. try:
  263. await asyncio.sleep(1) # Push every 1 second like real printers
  264. # Send status to all connected clients
  265. disconnected = []
  266. for client_id, writer in list(self._clients.items()):
  267. try:
  268. if writer.is_closing():
  269. disconnected.append(client_id)
  270. continue
  271. await self._send_status_report(writer)
  272. except OSError as e:
  273. logger.debug("Failed to push status to %s: %s", client_id, e)
  274. disconnected.append(client_id)
  275. # Remove disconnected clients
  276. for client_id in disconnected:
  277. self._clients.pop(client_id, None)
  278. except asyncio.CancelledError:
  279. break
  280. except Exception as e:
  281. logger.error("Periodic status push error: %s", e)
  282. logger.info("Periodic status push task stopped")
  283. async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
  284. """Handle an MQTT client connection."""
  285. addr = writer.get_extra_info("peername")
  286. client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
  287. logger.info("MQTT client connected: %s", client_id)
  288. authenticated = False
  289. try:
  290. while self._running:
  291. # Read MQTT fixed header
  292. try:
  293. header = await asyncio.wait_for(reader.read(1), timeout=60)
  294. except TimeoutError:
  295. break
  296. if not header:
  297. break
  298. packet_type = (header[0] & 0xF0) >> 4
  299. # Read remaining length
  300. remaining_length = await self._read_remaining_length(reader)
  301. if remaining_length is None:
  302. break
  303. # Read payload
  304. payload = await reader.read(remaining_length) if remaining_length > 0 else b""
  305. # Handle packet types
  306. if packet_type == 1: # CONNECT
  307. authenticated = await self._handle_connect(payload, writer)
  308. if not authenticated:
  309. break
  310. # Register client for periodic status pushes
  311. self._clients[client_id] = writer
  312. elif packet_type == 3: # PUBLISH
  313. if authenticated:
  314. await self._handle_publish(header[0], payload, writer)
  315. elif packet_type == 8: # SUBSCRIBE
  316. if authenticated:
  317. await self._handle_subscribe(payload, writer)
  318. elif packet_type == 12: # PINGREQ
  319. # Send PINGRESP
  320. writer.write(bytes([0xD0, 0x00]))
  321. await writer.drain()
  322. elif packet_type == 14: # DISCONNECT
  323. break
  324. except asyncio.CancelledError:
  325. pass
  326. except Exception as e:
  327. logger.debug("MQTT client error: %s", e)
  328. finally:
  329. logger.debug("MQTT client disconnected: %s", client_id)
  330. if client_id in self._clients:
  331. del self._clients[client_id]
  332. try:
  333. writer.close()
  334. await writer.wait_closed()
  335. except OSError:
  336. pass
  337. async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None:
  338. """Read MQTT remaining length (variable byte integer)."""
  339. multiplier = 1
  340. value = 0
  341. for _ in range(4):
  342. try:
  343. byte = await reader.read(1)
  344. if not byte:
  345. return None
  346. encoded = byte[0]
  347. value += (encoded & 127) * multiplier
  348. if (encoded & 128) == 0:
  349. return value
  350. multiplier *= 128
  351. except OSError:
  352. return None
  353. return None
  354. async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> bool:
  355. """Handle MQTT CONNECT packet.
  356. Returns True if authentication successful.
  357. """
  358. try:
  359. # Parse CONNECT packet
  360. # Skip protocol name length and name
  361. idx = 0
  362. proto_len = (payload[idx] << 8) | payload[idx + 1]
  363. idx += 2 + proto_len
  364. # Skip protocol level and connect flags
  365. # connect_flags = payload[idx + 1]
  366. idx += 2
  367. # Skip keepalive
  368. idx += 2
  369. # Read client ID
  370. client_id_len = (payload[idx] << 8) | payload[idx + 1]
  371. idx += 2
  372. # client_id = payload[idx : idx + client_id_len].decode("utf-8")
  373. idx += client_id_len
  374. # Read username
  375. username_len = (payload[idx] << 8) | payload[idx + 1]
  376. idx += 2
  377. username = payload[idx : idx + username_len].decode("utf-8")
  378. idx += username_len
  379. # Read password
  380. password_len = (payload[idx] << 8) | payload[idx + 1]
  381. idx += 2
  382. password = payload[idx : idx + password_len].decode("utf-8")
  383. # Authenticate
  384. if username == "bblp" and password == self.access_code:
  385. # Send CONNACK with success
  386. writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
  387. await writer.drain()
  388. logger.info("MQTT client authenticated successfully")
  389. # Send immediate status report after auth - slicer expects this
  390. await self._send_status_report(writer)
  391. return True
  392. else:
  393. # Send CONNACK with auth failure
  394. writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
  395. await writer.drain()
  396. logger.warning("MQTT auth failed for user '%s'", username)
  397. return False
  398. except (IndexError, ValueError) as e:
  399. logger.debug("MQTT CONNECT parse error: %s", e)
  400. # Send CONNACK with error
  401. writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error
  402. await writer.drain()
  403. return False
  404. async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter) -> None:
  405. """Handle MQTT SUBSCRIBE packet."""
  406. try:
  407. # Parse packet ID
  408. packet_id = (payload[0] << 8) | payload[1]
  409. # Parse topic filters (just acknowledge them)
  410. idx = 2
  411. granted_qos = []
  412. while idx < len(payload):
  413. topic_len = (payload[idx] << 8) | payload[idx + 1]
  414. idx += 2
  415. topic = payload[idx : idx + topic_len].decode("utf-8")
  416. idx += topic_len
  417. requested_qos = payload[idx]
  418. idx += 1
  419. logger.info("MQTT subscribe: %s QoS=%s", topic, requested_qos)
  420. granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
  421. # Send SUBACK
  422. suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
  423. suback += bytes(granted_qos)
  424. writer.write(suback)
  425. await writer.drain()
  426. # Send initial status report after subscribe
  427. await self._send_status_report(writer)
  428. except (IndexError, ValueError, OSError) as e:
  429. logger.debug("MQTT SUBSCRIBE error: %s", e)
  430. async def _send_status_report(self, writer: asyncio.StreamWriter) -> None:
  431. """Send a status report to the slicer after connection."""
  432. try:
  433. # Build status message matching Bambu printer format
  434. self._sequence_id += 1
  435. status = {
  436. "print": {
  437. "sequence_id": str(self._sequence_id),
  438. "command": "push_status",
  439. "msg": 0,
  440. "gcode_state": "IDLE",
  441. "gcode_file": "",
  442. "gcode_file_prepare_percent": "0",
  443. "subtask_name": "",
  444. "mc_print_stage": "",
  445. "mc_percent": 0,
  446. "mc_remaining_time": 0,
  447. "wifi_signal": "-44dBm",
  448. "print_error": 0,
  449. "print_type": "",
  450. "bed_temper": 25.0,
  451. "bed_target_temper": 0.0,
  452. "nozzle_temper": 25.0,
  453. "nozzle_target_temper": 0.0,
  454. "chamber_temper": 25.0,
  455. "cooling_fan_speed": "0",
  456. "big_fan1_speed": "0",
  457. "big_fan2_speed": "0",
  458. "heatbreak_fan_speed": "0",
  459. "spd_lvl": 1,
  460. "spd_mag": 100,
  461. "stg": [],
  462. "stg_cur": 0,
  463. "layer_num": 0,
  464. "total_layer_num": 0,
  465. "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL)
  466. "hw_switch_state": 0,
  467. "online": {"ahb": False, "rfid": False, "version": 7},
  468. "ams_status": 0,
  469. "sdcard": True,
  470. "storage": {"free": 1000000000, "total": 32000000000},
  471. "upgrade_state": {
  472. "sequence_id": 0,
  473. "progress": "",
  474. "status": "",
  475. "consistency_request": False,
  476. "dis_state": 0,
  477. "err_code": 0,
  478. "force_upgrade": False,
  479. "message": "",
  480. "module": "",
  481. "new_version_state": 2,
  482. "new_ver_list": [],
  483. "ota_new_version_number": "",
  484. "ahb_new_version_number": "",
  485. },
  486. "ipcam": {
  487. "ipcam_dev": "1",
  488. "ipcam_record": "enable",
  489. "timelapse": "disable",
  490. "resolution": "1080p",
  491. "mode_bits": 0,
  492. },
  493. "xcam": {
  494. "allow_skip_parts": False,
  495. "buildplate_marker_detector": True,
  496. "first_layer_inspector": True,
  497. "halt_print_sensitivity": "medium",
  498. "print_halt": True,
  499. "printing_monitor": True,
  500. "spaghetti_detector": True,
  501. },
  502. "lights_report": [{"node": "chamber_light", "mode": "on"}],
  503. "nozzle_diameter": "0.4",
  504. "nozzle_type": "hardened_steel",
  505. }
  506. }
  507. topic = f"device/{self.serial}/report"
  508. message = json.dumps(status)
  509. # Build MQTT PUBLISH packet
  510. topic_bytes = topic.encode("utf-8")
  511. message_bytes = message.encode("utf-8")
  512. # Calculate remaining length
  513. remaining = 2 + len(topic_bytes) + len(message_bytes)
  514. # Build packet
  515. packet = bytes([0x30]) # PUBLISH, QoS 0
  516. # Encode remaining length
  517. while remaining > 0:
  518. byte = remaining % 128
  519. remaining //= 128
  520. if remaining > 0:
  521. byte |= 0x80
  522. packet += bytes([byte])
  523. # Topic length and topic
  524. packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  525. packet += topic_bytes
  526. # Message payload
  527. packet += message_bytes
  528. writer.write(packet)
  529. await writer.drain()
  530. logger.info("Sent initial status report on %s", topic)
  531. except OSError as e:
  532. logger.error("Failed to send status report: %s", e)
  533. async def _send_version_response(self, writer: asyncio.StreamWriter, sequence_id: str) -> None:
  534. """Send version info response to the slicer."""
  535. try:
  536. # Build version response matching OrcaSlicer expectations
  537. # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
  538. version_info = {
  539. "info": {
  540. "command": "get_version",
  541. "sequence_id": sequence_id,
  542. "module": [
  543. {
  544. "name": "ota",
  545. "product_name": "X1 Carbon",
  546. "sw_ver": "01.07.00.00",
  547. "sw_new_ver": "",
  548. "hw_ver": "OTA",
  549. "sn": self.serial,
  550. "flag": 0,
  551. },
  552. {
  553. "name": "esp32",
  554. "product_name": "X1 Carbon",
  555. "sw_ver": "01.07.22.25",
  556. "sw_new_ver": "",
  557. "hw_ver": "AP05",
  558. "sn": self.serial,
  559. "flag": 0,
  560. },
  561. {
  562. "name": "rv1126",
  563. "product_name": "X1 Carbon",
  564. "sw_ver": "00.00.27.38",
  565. "sw_new_ver": "",
  566. "hw_ver": "AP05",
  567. "sn": self.serial,
  568. "flag": 0,
  569. },
  570. {
  571. "name": "th",
  572. "product_name": "X1 Carbon",
  573. "sw_ver": "00.00.04.00",
  574. "sw_new_ver": "",
  575. "hw_ver": "TH07",
  576. "sn": self.serial,
  577. "flag": 0,
  578. },
  579. {
  580. "name": "mc",
  581. "product_name": "X1 Carbon",
  582. "sw_ver": "00.00.10.00",
  583. "sw_new_ver": "",
  584. "hw_ver": "MC07",
  585. "sn": self.serial,
  586. "flag": 0,
  587. },
  588. ],
  589. }
  590. }
  591. topic = f"device/{self.serial}/report"
  592. message = json.dumps(version_info)
  593. # Build MQTT PUBLISH packet
  594. topic_bytes = topic.encode("utf-8")
  595. message_bytes = message.encode("utf-8")
  596. # Calculate remaining length
  597. remaining = 2 + len(topic_bytes) + len(message_bytes)
  598. # Build packet
  599. packet = bytes([0x30]) # PUBLISH, QoS 0
  600. # Encode remaining length
  601. while remaining > 0:
  602. byte = remaining % 128
  603. remaining //= 128
  604. if remaining > 0:
  605. byte |= 0x80
  606. packet += bytes([byte])
  607. # Topic length and topic
  608. packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  609. packet += topic_bytes
  610. # Message payload
  611. packet += message_bytes
  612. writer.write(packet)
  613. await writer.drain()
  614. logger.info("Sent version response on %s", topic)
  615. except OSError as e:
  616. logger.error("Failed to send version response: %s", e)
  617. async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter) -> None:
  618. """Handle MQTT PUBLISH packet."""
  619. try:
  620. # Parse topic
  621. idx = 0
  622. topic_len = (payload[idx] << 8) | payload[idx + 1]
  623. idx += 2
  624. topic = payload[idx : idx + topic_len].decode("utf-8")
  625. idx += topic_len
  626. # Check for packet ID (QoS > 0)
  627. qos = (header & 0x06) >> 1
  628. if qos > 0:
  629. # packet_id = (payload[idx] << 8) | payload[idx + 1]
  630. idx += 2
  631. # Parse message
  632. message = payload[idx:].decode("utf-8")
  633. logger.info("MQTT publish to %s: %s...", topic, message[:100])
  634. # Handle commands on device request topic
  635. if f"device/{self.serial}/request" in topic:
  636. try:
  637. data = json.loads(message)
  638. # Handle pushing command (status request)
  639. if "pushing" in data:
  640. pushing_data = data["pushing"]
  641. command = pushing_data.get("command", "")
  642. logger.info("MQTT pushing command: %s", command)
  643. if command == "pushall":
  644. # Slicer is requesting full status - send response
  645. logger.info("Sending status report in response to pushall")
  646. await self._send_status_report(writer)
  647. elif command == "start":
  648. # Slicer wants periodic status updates - send one now
  649. logger.info("Starting status push stream")
  650. await self._send_status_report(writer)
  651. # Handle info commands (get_version, etc.)
  652. if "info" in data:
  653. info_data = data["info"]
  654. command = info_data.get("command", "")
  655. sequence_id = info_data.get("sequence_id", "0")
  656. logger.info("MQTT info command: %s", command)
  657. if command == "get_version":
  658. await self._send_version_response(writer, sequence_id)
  659. # Handle print commands
  660. if "print" in data:
  661. print_data = data["print"]
  662. command = print_data.get("command", "")
  663. filename = print_data.get("subtask_name", "")
  664. logger.info("MQTT print command: %s for %s", command, filename)
  665. if self.on_print_command and command == "project_file":
  666. await self._notify_print_command(filename, print_data)
  667. except json.JSONDecodeError:
  668. pass
  669. except (IndexError, ValueError, OSError) as e:
  670. logger.debug("MQTT PUBLISH error: %s", e)
  671. async def _notify_print_command(self, filename: str, data: dict) -> None:
  672. """Notify callback of print command."""
  673. if self.on_print_command:
  674. try:
  675. result = self.on_print_command(filename, data)
  676. if asyncio.iscoroutine(result):
  677. await result
  678. except Exception as e:
  679. logger.error("Print command callback error: %s", e)