mqtt_server.py 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262
  1. """MQTT broker for virtual printer.
  2. Implements an MQTT broker that accepts connections from slicers,
  3. authenticates with the configured access code, and logs print commands.
  4. """
  5. import asyncio
  6. import copy
  7. import hmac
  8. import json
  9. import logging
  10. import ssl
  11. from collections.abc import Callable
  12. from pathlib import Path
  13. from typing import TYPE_CHECKING
  14. if TYPE_CHECKING:
  15. from backend.app.services.virtual_printer.mqtt_bridge import MQTTBridge
  16. logger = logging.getLogger(__name__)
  17. # Default MQTT port for Bambu printers (MQTT over TLS)
  18. MQTT_PORT = 8883
  19. # Per-IP MQTT auth rate-limit. 5 failures within 60 s blocks further attempts
  20. # for the remainder of the window. Bambu printers themselves don't rate-limit,
  21. # but they're not exposed past the LAN edge; Bambuddy's VPs sometimes are
  22. # (Tailscale, port-forwarded), so an 8-char access code without any
  23. # brute-force friction is too weak. The window auto-recovers — no manual
  24. # unblock — so a legitimate user who fat-fingered their access code 5 times
  25. # only waits up to 60 s.
  26. _AUTH_RATE_LIMIT_MAX_ATTEMPTS = 5
  27. _AUTH_RATE_LIMIT_WINDOW_SECONDS = 60.0
  28. # Pending-request map bound. Each entry maps a slicer command's
  29. # sequence_id to its originating client_id so the bridge response can be
  30. # routed back to just that client. Bounded so a slicer that issues
  31. # commands without ever consuming responses can't leak memory.
  32. _PENDING_REQUEST_MAX_ENTRIES = 256
  33. # Model code → product_name for version response (must match what slicer expects)
  34. MODEL_PRODUCT_NAMES = {
  35. "BL-P001": "X1 Carbon",
  36. "BL-P002": "X1",
  37. "C13": "X1E",
  38. "N6": "X2D",
  39. "C11": "P1P",
  40. "C12": "P1S",
  41. "N7": "P2S",
  42. "N2S": "A1",
  43. "N1": "A1 mini",
  44. "O1D": "H2D",
  45. "O1C": "H2C",
  46. "O1C2": "H2C",
  47. "O1S": "H2S",
  48. }
  49. class VirtualPrinterMQTTServer:
  50. """MQTT broker that accepts connections from slicers.
  51. This is a minimal MQTT broker implementation that:
  52. - Accepts TLS connections on port 8883
  53. - Authenticates with username 'bblp' and the configured access code
  54. - Receives print commands on device/{serial}/request
  55. - Can publish status on device/{serial}/report
  56. """
  57. def __init__(
  58. self,
  59. serial: str,
  60. access_code: str,
  61. cert_path: Path,
  62. key_path: Path,
  63. port: int = MQTT_PORT,
  64. on_print_command: Callable[[str, dict], None] | None = None,
  65. ):
  66. """Initialize the MQTT server.
  67. Args:
  68. serial: Virtual printer serial number
  69. access_code: Password for authentication
  70. cert_path: Path to TLS certificate
  71. key_path: Path to TLS private key
  72. port: Port to listen on (default 8883)
  73. on_print_command: Callback when print command received (filename, data)
  74. """
  75. self.serial = serial
  76. self.access_code = access_code
  77. self.cert_path = cert_path
  78. self.key_path = key_path
  79. self.port = port
  80. self.on_print_command = on_print_command
  81. self._running = False
  82. self._broker = None
  83. self._broker_task = None
  84. async def start(self) -> None:
  85. """Start the MQTT broker."""
  86. if self._running:
  87. return
  88. # Try to import amqtt
  89. try:
  90. from amqtt.broker import Broker
  91. except ImportError:
  92. logger.error("amqtt not installed. Run: pip install amqtt")
  93. return
  94. logger.info("Starting virtual printer MQTT broker on port %s", self.port)
  95. # Build broker configuration
  96. config = {
  97. "listeners": {
  98. "default": {
  99. "type": "tcp",
  100. "bind": f"0.0.0.0:{self.port}",
  101. "ssl": "on",
  102. "certfile": str(self.cert_path),
  103. "keyfile": str(self.key_path),
  104. },
  105. },
  106. "auth": {
  107. "allow-anonymous": False,
  108. "plugins": ["auth_custom"],
  109. },
  110. "topic-check": {
  111. "enabled": False, # Allow any topic
  112. },
  113. }
  114. try:
  115. self._running = True
  116. # Create and start broker
  117. self._broker = Broker(config)
  118. # Register custom auth plugin
  119. self._broker.plugins_manager.plugins_handlers["auth_custom"] = self._authenticate
  120. # Start the broker
  121. await self._broker.start()
  122. logger.info("MQTT broker started on port %s", self.port)
  123. # Keep running
  124. while self._running:
  125. await asyncio.sleep(1)
  126. except OSError as e:
  127. if e.errno == 98: # Address already in use
  128. logger.error("MQTT port %s is already in use", self.port)
  129. else:
  130. logger.error("MQTT broker error: %s", e)
  131. except asyncio.CancelledError:
  132. logger.debug("MQTT broker task cancelled")
  133. except Exception as e:
  134. logger.error("MQTT broker error: %s", e)
  135. finally:
  136. await self.stop()
  137. async def _authenticate(self, session) -> bool:
  138. """Authenticate MQTT connection.
  139. Args:
  140. session: MQTT session with username/password
  141. Returns:
  142. True if authentication successful
  143. """
  144. username = getattr(session, "username", None)
  145. password = getattr(session, "password", None)
  146. # Bambu slicers use 'bblp' as username and access code as password
  147. if username == "bblp" and password == self.access_code:
  148. logger.debug("MQTT client authenticated from %s", session.remote_address)
  149. return True
  150. logger.warning("MQTT auth failed for user '%s' from %s", username, session.remote_address)
  151. return False
  152. async def stop(self) -> None:
  153. """Stop the MQTT broker."""
  154. logger.info("Stopping MQTT broker")
  155. self._running = False
  156. if self._broker:
  157. try:
  158. await self._broker.shutdown()
  159. except OSError as e:
  160. logger.debug("Error shutting down MQTT broker: %s", e)
  161. self._broker = None
  162. class SimpleMQTTServer:
  163. """Simplified MQTT server using raw sockets.
  164. This is a fallback implementation that handles basic MQTT protocol
  165. without requiring the amqtt library. It's less feature-complete but
  166. more lightweight.
  167. """
  168. def __init__(
  169. self,
  170. serial: str,
  171. access_code: str,
  172. cert_path: Path,
  173. key_path: Path,
  174. port: int = MQTT_PORT,
  175. on_print_command: Callable[[str, dict], None] | None = None,
  176. model: str = "",
  177. bind_address: str = "0.0.0.0", # nosec B104
  178. vp_name: str = "",
  179. ):
  180. self.serial = serial
  181. self.access_code = access_code
  182. self.model = model
  183. self.cert_path = cert_path
  184. self.key_path = key_path
  185. self.port = port
  186. self.on_print_command = on_print_command
  187. self.bind_address = bind_address
  188. self.vp_name = vp_name
  189. self._log_prefix = f"[{vp_name}] " if vp_name else ""
  190. self._running = False
  191. # Set after the socket is bound — see ftp_server.py for rationale.
  192. self.ready = asyncio.Event()
  193. self._server = None
  194. self._clients: dict[str, asyncio.StreamWriter] = {}
  195. # Per-client "effective serial" — the serial the slicer actually uses in
  196. # device/{serial}/report|request topics. Populated from the first
  197. # SUBSCRIBE/PUBLISH we see on a connection. This lets the VP respond on
  198. # the topic the slicer is listening on even when it disagrees with
  199. # self.serial (e.g. a stale Orca config that was bound to an older VP
  200. # serial, or a printer entry that was re-pointed at the VP IP without
  201. # updating the serial).
  202. self._client_serials: dict[str, str] = {}
  203. self._status_push_task: asyncio.Task | None = None
  204. self._sequence_id = 0
  205. # Dynamic state for status reports
  206. self._gcode_state = "IDLE"
  207. self._current_file = ""
  208. self._prepare_percent = "0"
  209. # MQTT bridge for non-proxy modes — set by VirtualPrinterInstance after start().
  210. # When the bridge is_active, real printer pushes are fanned out to slicers and
  211. # the synthetic 1s push is suspended. When the target printer goes offline the
  212. # synthetic fallback resumes automatically.
  213. self._bridge: MQTTBridge | None = None
  214. # Per-source-IP failed-auth tracker for rate-limiting / lockout.
  215. # Maps IP → list[monotonic timestamp] of recent failures within the
  216. # window. Pruned on every check so it doesn't grow unbounded.
  217. self._auth_failures: dict[str, list[float]] = {}
  218. # Maps sequence_id → originating client_id for slicer-initiated
  219. # commands forwarded to the real printer. Used in
  220. # ``push_raw_to_clients`` to route the printer's response only
  221. # back to the requesting slicer instead of fanning out to all
  222. # connected clients (which leaks slicer A's responses to slicer
  223. # B in multi-slicer setups). FIFO-bounded; if a response never
  224. # arrives the entry ages out instead of leaking.
  225. self._pending_requests: dict[str, str] = {}
  226. async def start(self) -> None:
  227. """Start the MQTT server."""
  228. if self._running:
  229. return
  230. logger.info("Starting simple MQTT server on port %s", self.port)
  231. # Create SSL context with Bambu-compatible settings
  232. ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  233. ssl_context.load_cert_chain(str(self.cert_path), str(self.key_path))
  234. # Match Bambu printer behavior - accept any client
  235. ssl_context.verify_mode = ssl.CERT_NONE
  236. # Allow TLS 1.2 for broader compatibility (some slicers may not support 1.3)
  237. ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
  238. # Match real Bambu printer cipher behaviour: include the plain-RSA
  239. # AES-GCM suites the slicer expects. On hardened distros
  240. # (Fedora / RHEL with `update-crypto-policies`, hardened Alpine builds)
  241. # OpenSSL's `DEFAULT` list strips these suites, leaving no overlap
  242. # with the slicer's MQTT-over-TLS ClientHello — handshake fails
  243. # immediately and the slicer reports a connect error before any MQTT
  244. # CONNECT can be sent (#1610 audit). Same shape as the #620 fix.
  245. ssl_context.set_ciphers("DEFAULT:AES256-GCM-SHA384:AES128-GCM-SHA256")
  246. # Disable hostname checking
  247. ssl_context.check_hostname = False
  248. # Log certificate info
  249. import subprocess
  250. try:
  251. result = subprocess.run(
  252. ["openssl", "x509", "-in", str(self.cert_path), "-noout", "-subject", "-issuer"],
  253. capture_output=True,
  254. text=True,
  255. timeout=5,
  256. )
  257. logger.info("MQTT SSL cert info: %s", result.stdout.strip())
  258. except (OSError, subprocess.SubprocessError):
  259. pass # Certificate info is for debug logging only; not critical
  260. logger.info("MQTT SSL context: TLS 1.2+, cert=%s", self.cert_path)
  261. try:
  262. self._running = True
  263. # Wrapper to log ALL connection attempts including SSL errors
  264. async def connection_handler(reader, writer):
  265. try:
  266. addr = writer.get_extra_info("peername")
  267. ssl_obj = writer.get_extra_info("ssl_object")
  268. if ssl_obj:
  269. logger.info(
  270. f"{self._log_prefix}MQTT TLS connection from {addr} - cipher={ssl_obj.cipher()}, version={ssl_obj.version()}"
  271. )
  272. else:
  273. logger.info("%sMQTT connection from %s (no TLS?)", self._log_prefix, addr)
  274. await self._handle_client(reader, writer)
  275. except ssl.SSLError as e:
  276. logger.error("MQTT SSL error: %s", e)
  277. except Exception as e:
  278. logger.error("MQTT connection handler error: %s", e)
  279. self._server = await asyncio.start_server(
  280. connection_handler,
  281. self.bind_address,
  282. self.port,
  283. ssl=ssl_context,
  284. )
  285. self.ready.set()
  286. logger.info("Simple MQTT server listening on port %s", self.port)
  287. # Start periodic status push task
  288. self._status_push_task = asyncio.create_task(self._periodic_status_push())
  289. async with self._server:
  290. await self._server.serve_forever()
  291. except OSError as e:
  292. if e.errno == 98: # Address already in use
  293. logger.error("MQTT port %s is already in use", self.port)
  294. else:
  295. logger.error("MQTT server error: %s", e)
  296. except asyncio.CancelledError:
  297. logger.debug("MQTT server task cancelled")
  298. except Exception as e:
  299. logger.error("MQTT server error: %s", e)
  300. finally:
  301. await self.stop()
  302. async def stop(self) -> None:
  303. """Stop the MQTT server."""
  304. logger.info("Stopping simple MQTT server")
  305. self._running = False
  306. self.ready.clear()
  307. # Stop periodic status push
  308. if self._status_push_task:
  309. self._status_push_task.cancel()
  310. try:
  311. await self._status_push_task
  312. except asyncio.CancelledError:
  313. pass # Expected when stopping the periodic status push task
  314. self._status_push_task = None
  315. # Close all client connections (iterate over copy to avoid modification during iteration)
  316. for _client_id, writer in list(self._clients.items()):
  317. try:
  318. writer.close()
  319. await writer.wait_closed()
  320. except OSError:
  321. pass # Best-effort client connection cleanup; client may have disconnected
  322. self._clients.clear()
  323. self._client_serials.clear()
  324. if self._server:
  325. try:
  326. self._server.close()
  327. await self._server.wait_closed()
  328. except OSError:
  329. pass # Best-effort server shutdown; port may already be released
  330. self._server = None
  331. @staticmethod
  332. def _extract_serial_from_topic(topic: str) -> str | None:
  333. """Pull the serial out of a `device/{serial}/report|request` topic.
  334. Returns None if the topic doesn't match that shape — callers fall back
  335. to self.serial in that case.
  336. """
  337. if not topic.startswith("device/"):
  338. return None
  339. rest = topic[len("device/") :]
  340. # Expect "{serial}/report" or "{serial}/request" (possibly with suffixes).
  341. slash = rest.find("/")
  342. if slash <= 0:
  343. return None
  344. return rest[:slash]
  345. def set_bridge(self, bridge: "MQTTBridge | None") -> None:
  346. """Attach (or detach) the MQTT bridge that mirrors the target printer."""
  347. self._bridge = bridge
  348. async def _periodic_status_push(self) -> None:
  349. """Send periodic status updates to all connected clients (1 Hz, exact pre-bridge behaviour)."""
  350. logger.info("Starting periodic status push task")
  351. # Per-client push counters reset every 60 ticks. Lets us confirm from
  352. # logs whether the 1Hz push is actually reaching a specific slicer
  353. # connection (#1548 keepalive follow-up: keepalive parser shipped but
  354. # OrcaSlicer still disconnects on idle, and the periodic push is
  355. # otherwise silent at INFO level so it can't be observed in the
  356. # support bundle). One log line per minute per active connection —
  357. # nothing when no slicer is attached.
  358. push_counts: dict[str, int] = {}
  359. ticks_since_summary = 0
  360. while self._running:
  361. try:
  362. await asyncio.sleep(1) # Push every 1 second like real printers
  363. ticks_since_summary += 1
  364. disconnected = []
  365. for client_id, writer in list(self._clients.items()):
  366. try:
  367. if writer.is_closing():
  368. disconnected.append(client_id)
  369. continue
  370. serial = self._client_serials.get(client_id, self.serial)
  371. await self._send_status_report(writer, serial=serial)
  372. push_counts[client_id] = push_counts.get(client_id, 0) + 1
  373. except OSError as e:
  374. logger.debug("Failed to push status to %s: %s", client_id, e)
  375. disconnected.append(client_id)
  376. # Remove disconnected clients
  377. for client_id in disconnected:
  378. self._clients.pop(client_id, None)
  379. self._client_serials.pop(client_id, None)
  380. push_counts.pop(client_id, None)
  381. if ticks_since_summary >= 60:
  382. for cid, count in push_counts.items():
  383. logger.info(
  384. "%s1Hz status push: %d pushes/min to %s",
  385. self._log_prefix,
  386. count,
  387. cid,
  388. )
  389. push_counts.clear()
  390. ticks_since_summary = 0
  391. except asyncio.CancelledError:
  392. break
  393. except Exception as e:
  394. logger.error("Periodic status push error: %s", e)
  395. logger.info("Periodic status push task stopped")
  396. async def push_raw_to_clients(self, topic: str, payload: bytes) -> None:
  397. """Publish a pre-serialized MQTT payload on `topic` to connected slicers.
  398. Called by MQTTBridge from the asyncio loop (scheduled via
  399. run_coroutine_threadsafe from paho's network thread).
  400. Routes the response only back to the originating slicer if the
  401. payload's sequence_id was previously recorded via
  402. ``_record_pending_request``. Falls back to fan-out for
  403. printer-initiated pushes (push_status etc.) and for sequence_ids
  404. we never saw (covers a slicer that subscribes mid-flight to a
  405. topic for which an earlier request is still in flight).
  406. """
  407. topic_bytes = topic.encode("utf-8")
  408. # MQTT remaining-length: 2-byte topic length prefix + topic + message body.
  409. remaining = 2 + len(topic_bytes) + len(payload)
  410. packet = bytearray([0x30]) # PUBLISH, QoS 0
  411. while True:
  412. byte = remaining % 128
  413. remaining //= 128
  414. if remaining > 0:
  415. byte |= 0x80
  416. packet.append(byte)
  417. if remaining == 0:
  418. break
  419. packet.extend([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  420. packet.extend(topic_bytes)
  421. packet.extend(payload)
  422. frame = bytes(packet)
  423. target_client_id = self._lookup_pending_request_client(payload)
  424. disconnected = []
  425. for client_id, writer in list(self._clients.items()):
  426. if target_client_id is not None and client_id != target_client_id:
  427. continue
  428. try:
  429. if writer.is_closing():
  430. disconnected.append(client_id)
  431. continue
  432. writer.write(frame)
  433. try:
  434. await asyncio.wait_for(writer.drain(), timeout=5)
  435. except TimeoutError:
  436. logger.debug("MQTT drain timeout pushing bridge frame to %s", client_id)
  437. except OSError as e:
  438. logger.debug("Failed to push bridge frame to %s: %s", client_id, e)
  439. disconnected.append(client_id)
  440. for client_id in disconnected:
  441. self._clients.pop(client_id, None)
  442. self._client_serials.pop(client_id, None)
  443. async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
  444. """Handle an MQTT client connection."""
  445. addr = writer.get_extra_info("peername")
  446. client_id = f"{addr[0]}:{addr[1]}" if addr else "unknown"
  447. logger.info("%sMQTT client connected: %s", self._log_prefix, client_id)
  448. authenticated = False
  449. # Per-packet read timeout. Before CONNECT we default to 60 s so a
  450. # client that opens TCP but never sends anything still gets reaped;
  451. # after CONNECT the value is updated to 1.5× the keepalive the
  452. # client negotiated (MQTT spec §4.4). ``None`` means no timeout,
  453. # which is what spec §3.1.2.10 mandates for keep_alive == 0.
  454. read_timeout: float | None = 60.0
  455. try:
  456. while self._running:
  457. # Read MQTT fixed header
  458. try:
  459. header = await asyncio.wait_for(reader.read(1), timeout=read_timeout)
  460. except TimeoutError:
  461. break
  462. if not header:
  463. break
  464. packet_type = (header[0] & 0xF0) >> 4
  465. # Read remaining length
  466. remaining_length = await self._read_remaining_length(reader)
  467. if remaining_length is None:
  468. break
  469. # Read payload
  470. payload = await reader.read(remaining_length) if remaining_length > 0 else b""
  471. # Handle packet types
  472. if packet_type == 1: # CONNECT
  473. source_ip = addr[0] if addr else "unknown"
  474. if self._is_auth_rate_limited(source_ip):
  475. logger.warning(
  476. "%sMQTT auth rate-limited from %s (>=%d failures in %ds)",
  477. self._log_prefix,
  478. source_ip,
  479. _AUTH_RATE_LIMIT_MAX_ATTEMPTS,
  480. int(_AUTH_RATE_LIMIT_WINDOW_SECONDS),
  481. )
  482. writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
  483. await writer.drain()
  484. break
  485. authenticated, keep_alive = await self._handle_connect(payload, writer)
  486. if not authenticated:
  487. self._record_auth_failure(source_ip)
  488. break
  489. self._clear_auth_failures(source_ip)
  490. # Honour the client's negotiated keepalive (#1548). Before
  491. # this fix, the hardcoded 60 s above would close
  492. # OrcaSlicer's idle connection at the keepalive boundary
  493. # instead of waiting 1.5× as the spec requires — Orca
  494. # sends PINGREQ within its own keepalive interval but
  495. # we'd already have closed the socket.
  496. read_timeout = keep_alive * 1.5 if keep_alive > 0 else None
  497. # Register client for periodic status pushes; start with
  498. # self.serial as the fallback until we learn the slicer's
  499. # preferred serial from the first SUBSCRIBE/PUBLISH.
  500. self._clients[client_id] = writer
  501. self._client_serials[client_id] = self.serial
  502. elif packet_type == 3: # PUBLISH
  503. if authenticated:
  504. await self._handle_publish(header[0], payload, writer, client_id)
  505. elif packet_type == 8: # SUBSCRIBE
  506. if authenticated:
  507. await self._handle_subscribe(payload, writer, client_id)
  508. elif packet_type == 12: # PINGREQ
  509. # Send PINGRESP
  510. writer.write(bytes([0xD0, 0x00]))
  511. await writer.drain()
  512. elif packet_type == 14: # DISCONNECT
  513. break
  514. except asyncio.CancelledError:
  515. pass # Expected when server is shutting down and cancels client tasks
  516. except Exception as e:
  517. # Outer handler — inner handlers already absorb expected parser
  518. # / IO failures at debug. Anything reaching here is unexpected
  519. # and would otherwise silently drop the slicer connection with
  520. # no actionable signal in production logs (defaults are INFO+).
  521. logger.warning("%sMQTT client session error from %s: %s", self._log_prefix, client_id, e)
  522. finally:
  523. logger.debug("MQTT client disconnected: %s", client_id)
  524. self._clients.pop(client_id, None)
  525. self._client_serials.pop(client_id, None)
  526. try:
  527. writer.close()
  528. await writer.wait_closed()
  529. except OSError:
  530. pass # Best-effort socket cleanup on client disconnect
  531. async def _read_remaining_length(self, reader: asyncio.StreamReader) -> int | None:
  532. """Read MQTT remaining length (variable byte integer)."""
  533. multiplier = 1
  534. value = 0
  535. for _ in range(4):
  536. try:
  537. byte = await reader.read(1)
  538. if not byte:
  539. return None
  540. encoded = byte[0]
  541. value += (encoded & 127) * multiplier
  542. if (encoded & 128) == 0:
  543. return value
  544. multiplier *= 128
  545. except OSError:
  546. return None
  547. return None
  548. def _record_pending_request(self, data: dict, client_id: str) -> None:
  549. """Stash sequence_id → client_id for any nested block with a sequence_id.
  550. Slicer commands typically wrap their seq id in ``{"print": {...}}`` or
  551. ``{"info": {...}}`` / ``{"system": {...}}`` etc. Walks top-level dict
  552. values once to find the seq id; if absent (some commands omit it) we
  553. skip — the response will fall through to broadcast which is fine for
  554. unsolicited pushes.
  555. """
  556. for block in data.values():
  557. if isinstance(block, dict):
  558. seq = block.get("sequence_id")
  559. if seq is not None:
  560. key = str(seq)
  561. # Evict oldest entry when over the cap. Python dicts
  562. # preserve insertion order so iter(self._pending_requests)
  563. # yields the oldest key first.
  564. while len(self._pending_requests) >= _PENDING_REQUEST_MAX_ENTRIES:
  565. oldest = next(iter(self._pending_requests))
  566. self._pending_requests.pop(oldest, None)
  567. self._pending_requests[key] = client_id
  568. return
  569. def _lookup_pending_request_client(self, payload: bytes) -> str | None:
  570. """Parse a bridge-forwarded MQTT payload and return the originating
  571. client_id if its sequence_id was recorded.
  572. Returns ``None`` for printer-initiated pushes (no recorded seq id) so
  573. push_raw_to_clients falls back to broadcast — required for push_status
  574. and the other unsolicited pushes that every connected slicer expects.
  575. """
  576. try:
  577. parsed = json.loads(payload)
  578. except (ValueError, TypeError):
  579. return None
  580. if not isinstance(parsed, dict):
  581. return None
  582. for block in parsed.values():
  583. if isinstance(block, dict):
  584. seq = block.get("sequence_id")
  585. if seq is not None:
  586. return self._pending_requests.pop(str(seq), None)
  587. return None
  588. def _is_auth_rate_limited(self, source_ip: str) -> bool:
  589. """Return True if ``source_ip`` has hit the per-IP failure cap.
  590. Prunes timestamps older than the window so the dict doesn't grow
  591. unbounded. Uses ``time.monotonic()`` for a wall-clock-jump-immune
  592. clock that's safe to call from any context (sync or async).
  593. """
  594. import time as _time
  595. now = _time.monotonic()
  596. window_start = now - _AUTH_RATE_LIMIT_WINDOW_SECONDS
  597. recent = [t for t in self._auth_failures.get(source_ip, []) if t >= window_start]
  598. if recent:
  599. self._auth_failures[source_ip] = recent
  600. else:
  601. self._auth_failures.pop(source_ip, None)
  602. return len(recent) >= _AUTH_RATE_LIMIT_MAX_ATTEMPTS
  603. def _record_auth_failure(self, source_ip: str) -> None:
  604. """Append a timestamp for ``source_ip``'s failed auth attempt."""
  605. import time as _time
  606. now = _time.monotonic()
  607. self._auth_failures.setdefault(source_ip, []).append(now)
  608. def _clear_auth_failures(self, source_ip: str) -> None:
  609. """Reset ``source_ip``'s failure history after a successful auth."""
  610. self._auth_failures.pop(source_ip, None)
  611. async def _handle_connect(self, payload: bytes, writer: asyncio.StreamWriter) -> tuple[bool, int]:
  612. """Handle MQTT CONNECT packet.
  613. Returns ``(authenticated, keep_alive_seconds)`` — the second element
  614. is the value the client advertised in CONNECT, so the caller's
  615. read-loop can honour it instead of the hardcoded default. ``0``
  616. means the client opted out of keepalive (#1548).
  617. """
  618. try:
  619. # Parse CONNECT packet
  620. # Skip protocol name length and name
  621. idx = 0
  622. proto_len = (payload[idx] << 8) | payload[idx + 1]
  623. idx += 2 + proto_len
  624. # Skip protocol level and connect flags
  625. # connect_flags = payload[idx + 1]
  626. idx += 2
  627. # Keepalive (2-byte big-endian, seconds). Honoured by the read
  628. # loop in `_handle_client` per MQTT spec §3.1.2.10 / §4.4 —
  629. # before #1548 we ignored this and used a hardcoded 60 s, which
  630. # closed OrcaSlicer's idle connection at exactly the negotiated
  631. # keepalive boundary instead of the spec-mandated 1.5×.
  632. keep_alive = (payload[idx] << 8) | payload[idx + 1]
  633. idx += 2
  634. # Read client ID
  635. client_id_len = (payload[idx] << 8) | payload[idx + 1]
  636. idx += 2
  637. # client_id = payload[idx : idx + client_id_len].decode("utf-8")
  638. idx += client_id_len
  639. # Read username
  640. username_len = (payload[idx] << 8) | payload[idx + 1]
  641. idx += 2
  642. username = payload[idx : idx + username_len].decode("utf-8")
  643. idx += username_len
  644. # Read password
  645. password_len = (payload[idx] << 8) | payload[idx + 1]
  646. idx += 2
  647. password = payload[idx : idx + password_len].decode("utf-8")
  648. # Authenticate. ``hmac.compare_digest`` is constant-time to keep
  649. # the auth check from leaking the access code via response timing
  650. # under network jitter — LAN-only threat is marginal, but it's
  651. # the standard fix and costs nothing.
  652. if username == "bblp" and hmac.compare_digest(password, self.access_code):
  653. # Send CONNACK with success
  654. writer.write(bytes([0x20, 0x02, 0x00, 0x00]))
  655. await writer.drain()
  656. logger.info("%sMQTT client authenticated successfully", self._log_prefix)
  657. # Send immediate status report after auth - slicer expects this
  658. await self._send_status_report(writer)
  659. return True, keep_alive
  660. else:
  661. # Send CONNACK with auth failure
  662. writer.write(bytes([0x20, 0x02, 0x00, 0x05])) # Not authorized
  663. await writer.drain()
  664. logger.warning("%sMQTT auth failed for user '%s' (access code mismatch)", self._log_prefix, username)
  665. return False, 0
  666. except (IndexError, ValueError) as e:
  667. logger.debug("MQTT CONNECT parse error: %s", e)
  668. # Send CONNACK with error
  669. writer.write(bytes([0x20, 0x02, 0x00, 0x02])) # Protocol error
  670. await writer.drain()
  671. return False, 0
  672. async def _handle_subscribe(self, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
  673. """Handle MQTT SUBSCRIBE packet."""
  674. try:
  675. # Parse packet ID
  676. packet_id = (payload[0] << 8) | payload[1]
  677. # Parse topic filters (just acknowledge them)
  678. idx = 2
  679. granted_qos = []
  680. learned_serial: str | None = None
  681. while idx < len(payload):
  682. topic_len = (payload[idx] << 8) | payload[idx + 1]
  683. idx += 2
  684. topic = payload[idx : idx + topic_len].decode("utf-8")
  685. idx += topic_len
  686. requested_qos = payload[idx]
  687. idx += 1
  688. logger.info("%sMQTT subscribe: %s QoS=%s", self._log_prefix, topic, requested_qos)
  689. granted_qos.append(min(requested_qos, 1)) # Grant up to QoS 1
  690. # Remember the serial the slicer is listening on so status/version
  691. # responses go to a topic it actually subscribed to.
  692. if learned_serial is None:
  693. extracted = self._extract_serial_from_topic(topic)
  694. if extracted:
  695. learned_serial = extracted
  696. if learned_serial and learned_serial != self._client_serials.get(client_id):
  697. if learned_serial != self.serial:
  698. logger.info(
  699. "%sMQTT client subscribed with serial %s (VP serial is %s) — adapting responses",
  700. self._log_prefix,
  701. learned_serial,
  702. self.serial,
  703. )
  704. self._client_serials[client_id] = learned_serial
  705. # Send SUBACK
  706. suback = bytes([0x90, 2 + len(granted_qos), packet_id >> 8, packet_id & 0xFF])
  707. suback += bytes(granted_qos)
  708. writer.write(suback)
  709. await writer.drain()
  710. # Send initial status report after subscribe on the client's subscribed topic
  711. await self._send_status_report(writer, serial=self._client_serials.get(client_id, self.serial))
  712. except (IndexError, ValueError, OSError) as e:
  713. logger.debug("MQTT SUBSCRIBE error: %s", e)
  714. async def _send_status_report(self, writer: asyncio.StreamWriter, serial: str | None = None) -> None:
  715. """Send a status report to the slicer after connection.
  716. When a bridge is active and has cached the real printer's latest
  717. push_status, send a copy of the real push with only the upload-state-
  718. machine fields we own (gcode_state, gcode_file, prepare_percent,
  719. subtask_name) overridden. BambuStudio's Send pre-flight checks the
  720. push_status shape against what it expects from the printer model, and
  721. the synthetic stub introduced fields the real H2D doesn't have (storage,
  722. the wrong chamber_temper shape, etc.) which trip the check.
  723. """
  724. try:
  725. self._sequence_id += 1
  726. cached = self._bridge.get_latest_print_state() if self._bridge is not None else None
  727. if isinstance(cached, dict):
  728. # Real-printer-shaped response. Copy the cache, then replace the
  729. # protocol / upload-state fields with values under our control.
  730. # Deep copy — current mutations are top-level only, but a future
  731. # override that writes into a nested dict (e.g. ``online``,
  732. # ``upgrade_state``, ``ipcam``) would otherwise corrupt the
  733. # bridge cache and be read by every subsequent subscriber until
  734. # the next real-printer push lands. Cost is one allocation per
  735. # status report; the cached dict is already short-lived.
  736. print_block = copy.deepcopy(cached)
  737. print_block["sequence_id"] = str(self._sequence_id)
  738. print_block["command"] = "push_status"
  739. print_block["msg"] = 0
  740. print_block["gcode_state"] = self._gcode_state
  741. print_block["gcode_file"] = self._current_file
  742. print_block["gcode_file_prepare_percent"] = self._prepare_percent
  743. if self._current_file:
  744. print_block["subtask_name"] = self._current_file.replace(".3mf", "")
  745. else:
  746. # Don't override real subtask_name with empty if no upload pending.
  747. print_block.setdefault("subtask_name", "")
  748. # Storage-availability indicators the slicer's "Send" pre-flight reads
  749. # (#1228). P1S/A1-class firmware doesn't always include these in
  750. # push_status (no SD card inserted, older field shapes), and BambuStudio
  751. # rejects the send pre-flight with the generic "storage needs to be
  752. # inserted before send to printer" error before even attempting FTP.
  753. # For VP usage the slicer uploads via FTPS to Bambuddy's filesystem —
  754. # the printer's actual SD/storage state is irrelevant on that path.
  755. # Force "available" indicators so the pre-flight passes regardless of
  756. # what the real printer reports. Restores the 0.2.3.2 synthetic-stub
  757. # behaviour for these fields without losing the live AMS / k-profile /
  758. # camera mirror cached-as-base provides.
  759. print_block["home_flag"] = print_block.get("home_flag", 0) | 0x100 # bit 8 = HAS_SDCARD_NORMAL
  760. print_block["sdcard"] = True
  761. print_block.setdefault("storage", {"free": 1_000_000_000, "total": 32_000_000_000})
  762. # Live-progress fields the slicer's Send pre-flight reads
  763. # (#1558). When the real target printer is mid-print, the
  764. # cached push_status carries the real values for these
  765. # fields and the slicer reads the VP as "busy" — refusing
  766. # Send — even though gcode_state above is forced to IDLE.
  767. # For VP usage the VP isn't actually running the print
  768. # the printer is, so these need to mirror the synthetic
  769. # stub's idle values. Same shape as #1228 (storage) — the
  770. # cached-branch override set just needed extending.
  771. print_block["mc_print_stage"] = ""
  772. print_block["mc_percent"] = 0
  773. print_block["mc_remaining_time"] = 0
  774. print_block["stg"] = []
  775. print_block["stg_cur"] = 0
  776. print_block["layer_num"] = 0
  777. print_block["total_layer_num"] = 0
  778. print_block["print_error"] = 0
  779. status = {"print": print_block}
  780. await self._publish_to_report(writer, status, serial or self.serial)
  781. return
  782. # No bridge / no cache yet — fall back to the synthetic stub.
  783. status = {
  784. "print": {
  785. "sequence_id": str(self._sequence_id),
  786. "command": "push_status",
  787. "msg": 0,
  788. "gcode_state": self._gcode_state,
  789. "gcode_file": self._current_file,
  790. "gcode_file_prepare_percent": self._prepare_percent,
  791. "subtask_name": self._current_file.replace(".3mf", "") if self._current_file else "",
  792. "mc_print_stage": "",
  793. "mc_percent": 0,
  794. "mc_remaining_time": 0,
  795. "wifi_signal": "-44dBm",
  796. "print_error": 0,
  797. "print_type": "",
  798. "bed_temper": 25.0,
  799. "bed_target_temper": 0.0,
  800. "nozzle_temper": 25.0,
  801. "nozzle_target_temper": 0.0,
  802. "chamber_temper": 25.0,
  803. "cooling_fan_speed": "0",
  804. "big_fan1_speed": "0",
  805. "big_fan2_speed": "0",
  806. "heatbreak_fan_speed": "0",
  807. "spd_lvl": 1,
  808. "spd_mag": 100,
  809. "stg": [],
  810. "stg_cur": 0,
  811. "layer_num": 0,
  812. "total_layer_num": 0,
  813. "home_flag": 256, # Bit 8 = SD card present (HAS_SDCARD_NORMAL)
  814. "hw_switch_state": 0,
  815. "online": {"ahb": False, "rfid": False, "version": 7},
  816. "ams_status": 0,
  817. "sdcard": True,
  818. "storage": {"free": 1000000000, "total": 32000000000},
  819. "upgrade_state": {
  820. "sequence_id": 0,
  821. "progress": "",
  822. "status": "",
  823. "consistency_request": False,
  824. "dis_state": 0,
  825. "err_code": 0,
  826. "force_upgrade": False,
  827. "message": "",
  828. "module": "",
  829. "new_version_state": 2,
  830. "new_ver_list": [],
  831. "ota_new_version_number": "",
  832. "ahb_new_version_number": "",
  833. },
  834. "ipcam": {
  835. "ipcam_dev": "1",
  836. "ipcam_record": "enable",
  837. "timelapse": "disable",
  838. "resolution": "1080p",
  839. "mode_bits": 0,
  840. },
  841. "xcam": {
  842. "allow_skip_parts": False,
  843. "buildplate_marker_detector": True,
  844. "first_layer_inspector": True,
  845. "halt_print_sensitivity": "medium",
  846. "print_halt": True,
  847. "printing_monitor": True,
  848. "spaghetti_detector": True,
  849. },
  850. "lights_report": [{"node": "chamber_light", "mode": "on"}],
  851. "nozzle_diameter": "0.4",
  852. "nozzle_type": "hardened_steel",
  853. }
  854. }
  855. await self._publish_to_report(writer, status, serial or self.serial)
  856. except OSError as e:
  857. logger.error("Failed to send status report: %s", e)
  858. async def _send_version_response(
  859. self, writer: asyncio.StreamWriter, sequence_id: str, serial: str | None = None
  860. ) -> None:
  861. """Send version info response to the slicer."""
  862. try:
  863. product_name = MODEL_PRODUCT_NAMES.get(self.model, self.model or "X1 Carbon")
  864. # The serial is embedded inside the module[].sn fields *and* used as
  865. # the report topic. Use the client's effective serial so the slicer
  866. # sees internal/topic consistency even when it differs from self.serial.
  867. serial = serial or self.serial
  868. # Build version response matching OrcaSlicer expectations
  869. # Required fields per module: name, product_name, sw_ver, sw_new_ver, sn, hw_ver, flag
  870. version_info = {
  871. "info": {
  872. "command": "get_version",
  873. "sequence_id": sequence_id,
  874. "module": [
  875. {
  876. "name": "ota",
  877. "product_name": product_name,
  878. "sw_ver": "01.07.00.00",
  879. "sw_new_ver": "",
  880. "hw_ver": "OTA",
  881. "sn": serial,
  882. "flag": 0,
  883. },
  884. {
  885. "name": "esp32",
  886. "product_name": product_name,
  887. "sw_ver": "01.07.22.25",
  888. "sw_new_ver": "",
  889. "hw_ver": "AP05",
  890. "sn": serial,
  891. "flag": 0,
  892. },
  893. {
  894. "name": "rv1126",
  895. "product_name": product_name,
  896. "sw_ver": "00.00.27.38",
  897. "sw_new_ver": "",
  898. "hw_ver": "AP05",
  899. "sn": serial,
  900. "flag": 0,
  901. },
  902. {
  903. "name": "th",
  904. "product_name": product_name,
  905. "sw_ver": "00.00.04.00",
  906. "sw_new_ver": "",
  907. "hw_ver": "TH07",
  908. "sn": serial,
  909. "flag": 0,
  910. },
  911. {
  912. "name": "mc",
  913. "product_name": product_name,
  914. "sw_ver": "00.00.10.00",
  915. "sw_new_ver": "",
  916. "hw_ver": "MC07",
  917. "sn": serial,
  918. "flag": 0,
  919. },
  920. ],
  921. }
  922. }
  923. # Overlay real version modules from the bridge cache when available
  924. # (specifically the AMS modules ams/0, n3f/0, n3s/128 etc. that
  925. # BambuStudio's Prepare tab uses to identify AMS hardware — without
  926. # them every AMS unit shows as "unknown" in the Prepare panel).
  927. if self._bridge is not None:
  928. cached_modules = self._bridge.get_latest_version_modules()
  929. if isinstance(cached_modules, list) and cached_modules:
  930. version_info["info"]["module"] = cached_modules
  931. await self._publish_to_report(writer, version_info, serial)
  932. logger.info("Sent version response (product_name=%s)", product_name)
  933. except OSError as e:
  934. logger.error("Failed to send version response: %s", e)
  935. def set_gcode_state(self, state: str, filename: str = "", prepare_percent: str = "0") -> None:
  936. """Update the gcode state reported to connected slicers.
  937. Called by the manager to reflect FTP upload progress/completion.
  938. """
  939. self._gcode_state = state
  940. self._current_file = filename
  941. self._prepare_percent = prepare_percent
  942. async def _publish_to_report(self, writer: asyncio.StreamWriter, payload: dict, serial: str = "") -> None:
  943. """Publish a message on the device report topic.
  944. Real Bambu printers wire-format push_status JSON with 4-space indentation
  945. (32254 bytes for an idle H2D push vs 14268 bytes compact). BambuStudio's
  946. Send pre-flight rejects compact JSON — without matching the on-wire
  947. format the slicer never proceeds to FTP upload.
  948. """
  949. topic = f"device/{serial or self.serial}/report"
  950. message = json.dumps(payload, indent=4)
  951. topic_bytes = topic.encode("utf-8")
  952. message_bytes = message.encode("utf-8")
  953. remaining = 2 + len(topic_bytes) + len(message_bytes)
  954. packet = bytes([0x30]) # PUBLISH, QoS 0
  955. while remaining > 0:
  956. byte = remaining % 128
  957. remaining //= 128
  958. if remaining > 0:
  959. byte |= 0x80
  960. packet += bytes([byte])
  961. packet += bytes([len(topic_bytes) >> 8, len(topic_bytes) & 0xFF])
  962. packet += topic_bytes
  963. packet += message_bytes
  964. writer.write(packet)
  965. # Timeout the drain to prevent blocking the event loop if the
  966. # MQTT client stops reading (e.g. slicer busy with FTP upload).
  967. try:
  968. await asyncio.wait_for(writer.drain(), timeout=5)
  969. except TimeoutError:
  970. logger.debug("MQTT drain timeout for %s — client may be busy", topic)
  971. async def _send_print_response(
  972. self, writer: asyncio.StreamWriter, sequence_id: str, filename: str, serial: str | None = None
  973. ) -> None:
  974. """Send project_file acknowledgment matching real Bambu printer behavior."""
  975. # Update state so periodic status pushes reflect preparation
  976. self._gcode_state = "PREPARE"
  977. self._current_file = filename
  978. self._prepare_percent = "0"
  979. try:
  980. # Send command acknowledgment — slicer expects to see
  981. # command: "project_file" echoed back before starting FTP upload
  982. subtask_name = filename.replace(".3mf", "") if filename else ""
  983. response = {
  984. "print": {
  985. "command": "project_file",
  986. "sequence_id": sequence_id,
  987. "param": "Metadata/plate_1.gcode",
  988. "subtask_name": subtask_name,
  989. "gcode_state": "PREPARE",
  990. "gcode_file": filename,
  991. "gcode_file_prepare_percent": "0",
  992. "result": "SUCCESS",
  993. "msg": 0,
  994. }
  995. }
  996. await self._publish_to_report(writer, response, serial or self.serial)
  997. logger.info("Sent project_file acknowledgment for %s", filename)
  998. except OSError as e:
  999. logger.error("Failed to send print response: %s", e)
  1000. async def _handle_publish(self, header: int, payload: bytes, writer: asyncio.StreamWriter, client_id: str) -> None:
  1001. """Handle MQTT PUBLISH packet."""
  1002. try:
  1003. # Parse topic
  1004. idx = 0
  1005. topic_len = (payload[idx] << 8) | payload[idx + 1]
  1006. idx += 2
  1007. topic = payload[idx : idx + topic_len].decode("utf-8")
  1008. idx += topic_len
  1009. # Check for packet ID (QoS > 0)
  1010. qos = (header & 0x06) >> 1
  1011. if qos > 0:
  1012. # packet_id = (payload[idx] << 8) | payload[idx + 1]
  1013. idx += 2
  1014. # Parse message
  1015. message = payload[idx:].decode("utf-8")
  1016. logger.info("MQTT publish to %s: %s...", topic, message[:100])
  1017. # Only handle publishes on *some* device/.../request topic. The
  1018. # serial is taken from the topic rather than compared against
  1019. # self.serial: the client is already authenticated via the access
  1020. # code, and Orca/BambuStudio may have a cached serial that differs
  1021. # from the VP's computed self.serial (#927). Use the topic's serial
  1022. # for all responses so they land on the topic the slicer subscribed
  1023. # to.
  1024. if not topic.startswith("device/") or "/request" not in topic:
  1025. return
  1026. client_serial = self._extract_serial_from_topic(topic) or self.serial
  1027. if client_serial and client_serial != self._client_serials.get(client_id):
  1028. if client_serial != self.serial:
  1029. logger.info(
  1030. "%sMQTT client publishing with serial %s (VP serial is %s) — adapting responses",
  1031. self._log_prefix,
  1032. client_serial,
  1033. self.serial,
  1034. )
  1035. self._client_serials[client_id] = client_serial
  1036. try:
  1037. # Some slicer builds (observed with OrcaSlicer on Linux, #927)
  1038. # include the C-string null terminator in the MQTT payload
  1039. # length, so the decoded message ends with \x00. Real brokers
  1040. # pass the bytes through; strict json.loads raises "Extra data"
  1041. # and every pushall/get_version/project_file silently dropped.
  1042. data = json.loads(message.rstrip("\x00 \r\n\t"))
  1043. except json.JSONDecodeError as e:
  1044. logger.debug(
  1045. "MQTT publish JSON decode failed: %s (payload=%r)",
  1046. e,
  1047. message[:200],
  1048. )
  1049. return
  1050. # The synthetic flow below is the original (pre-bridge) behaviour and is
  1051. # what the proven-working FTP "Send" depends on. Do NOT replace any
  1052. # synthetic response with a forward — only ADD forwarding alongside,
  1053. # at the bottom, for commands the synthetic flow doesn't handle
  1054. # (AMS write / xcam / system / etc., which need to actually reach
  1055. # the real printer).
  1056. handled_locally = False
  1057. # Handle pushing command (status request)
  1058. if "pushing" in data:
  1059. pushing_data = data["pushing"]
  1060. command = pushing_data.get("command", "")
  1061. logger.info("MQTT pushing command: %s", command)
  1062. if command == "pushall":
  1063. logger.info("Sending status report in response to pushall")
  1064. await self._send_status_report(writer, serial=client_serial)
  1065. handled_locally = True
  1066. elif command == "start":
  1067. logger.info("Starting status push stream")
  1068. await self._send_status_report(writer, serial=client_serial)
  1069. handled_locally = True
  1070. # Handle info commands (get_version, etc.)
  1071. if "info" in data:
  1072. info_data = data["info"]
  1073. command = info_data.get("command", "")
  1074. sequence_id = info_data.get("sequence_id", "0")
  1075. logger.info("MQTT info command: %s", command)
  1076. if command == "get_version":
  1077. await self._send_version_response(writer, sequence_id, serial=client_serial)
  1078. handled_locally = True
  1079. # Handle print commands
  1080. if "print" in data:
  1081. print_data = data["print"]
  1082. command = print_data.get("command", "")
  1083. filename = print_data.get("subtask_name", "")
  1084. sequence_id = print_data.get("sequence_id", "0")
  1085. logger.info("MQTT print command: %s for %s", command, filename)
  1086. if command in ("project_file", "gcode_file"):
  1087. # File lives on Bambuddy, not the printer — synthetic only.
  1088. file_3mf = print_data.get("file", filename)
  1089. await self._send_print_response(writer, sequence_id, file_3mf, serial=client_serial)
  1090. if self.on_print_command:
  1091. await self._notify_print_command(filename, print_data)
  1092. handled_locally = True
  1093. # Forward anything the synthetic flow didn't handle to the real
  1094. # printer. AMS load / dry / xcam / system / extrusion_cali_get etc.
  1095. if not handled_locally and self._bridge is not None and self._bridge.is_active:
  1096. # Remember which client originated this command so the
  1097. # printer's response goes back only to them (not fanned
  1098. # out to every connected slicer).
  1099. self._record_pending_request(data, client_id)
  1100. self._bridge.forward_to_printer(data)
  1101. except (IndexError, ValueError, OSError) as e:
  1102. logger.debug("MQTT PUBLISH error: %s", e)
  1103. async def _notify_print_command(self, filename: str, data: dict) -> None:
  1104. """Notify callback of print command."""
  1105. if self.on_print_command:
  1106. try:
  1107. result = self.on_print_command(filename, data)
  1108. if asyncio.iscoroutine(result):
  1109. await result
  1110. except Exception as e:
  1111. logger.error("Print command callback error: %s", e)