mqtt_bridge.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. """MQTT bridge for non-proxy virtual printers.
  2. Mirrors the target printer's state to slicers connected to a virtual printer
  3. without opening a second MQTT session on the printer (reuses Bambuddy's
  4. existing subscription — firmware inflight budget unaffected, see PR #1164).
  5. Architecture (cached-as-base, not a separate fan-out stream):
  6. - **push_status** snapshots from the printer are CACHED here. The VP's
  7. `SimpleMQTTServer._send_status_report` consults that cache and sends
  8. a near-byte-identical copy of the real push to the slicer (with
  9. sequence_id / gcode_state / etc. overridden). Single source of truth
  10. keeps BambuStudio's Send pre-flight happy.
  11. - **info.get_version** responses are also cached so the synthetic version
  12. response can include the real AMS module list (n3f/n3s/ams entries).
  13. Without this BambuStudio's Prepare tab labels every AMS as "unknown".
  14. - **Other command responses** (extrusion_cali_get, AMS write acks,
  15. xcam responses, …) are fanned out raw to the slicer — they carry
  16. sequence_ids the slicer is waiting on; the slicer matches and ignores
  17. unrelated ones.
  18. Identity rewriting at cache time:
  19. - `upgrade_state.sn` (and any other nested dict's `sn` matching the real
  20. serial) → VP serial
  21. - `net.info[*].ip` little-endian uint32 → VP bind IP. BambuStudio reads
  22. this as the FTP destination IP. Without this the slicer FTPs straight
  23. to the real printer and bypasses Bambuddy.
  24. - `ipcam.rtsp_url` is left unchanged: BambuStudio overrides the URL host
  25. with the device IP it bound to (the VP), so the slicer hits the VP's
  26. own RTSPS proxy on port 322.
  27. """
  28. from __future__ import annotations
  29. import asyncio
  30. import copy
  31. import json
  32. import logging
  33. from typing import TYPE_CHECKING
  34. if TYPE_CHECKING:
  35. from backend.app.services.bambu_mqtt import BambuMQTTClient
  36. from backend.app.services.printer_manager import PrinterManager
  37. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  38. logger = logging.getLogger(__name__)
  39. REFRESH_INTERVAL_SECONDS = 30.0
  40. # Top-level push_status fields that Bambu firmware sends in FULL pushall
  41. # responses (on `pushall` request / printer reconnect) but typically OMITS
  42. # from 1 Hz incremental push_status updates. Without preserving these
  43. # fields across incremental updates, the bridge cache would lose AMS info
  44. # (and friends) between pushalls — slicers reading the cache would see a
  45. # stripped-down state and the fix would only re-appear on a manual printer
  46. # power-cycle (#1371). Mirrors the same set Bambuddy itself preserves in
  47. # bambu_mqtt.py:2686-2711 for its own internal raw_data, with a few more
  48. # entries that the slicer cares about (net, ipcam, lights_report).
  49. _SLICER_VISIBLE_STICKY_KEYS: tuple[str, ...] = (
  50. "ams",
  51. "vt_tray",
  52. "ams_extruder_map",
  53. "mapping",
  54. "net",
  55. "ipcam",
  56. "lights_report",
  57. )
  58. def _ip_to_uint32_le(ip_str: str) -> int:
  59. """Encode dotted-quad IPv4 as little-endian uint32 (Bambu MQTT's `net.info[].ip` shape)."""
  60. parts = [int(x) for x in ip_str.split(".")]
  61. if len(parts) != 4 or any(p < 0 or p > 255 for p in parts):
  62. raise ValueError(f"invalid IPv4: {ip_str!r}")
  63. return parts[0] | (parts[1] << 8) | (parts[2] << 16) | (parts[3] << 24)
  64. class MQTTBridge:
  65. """Per-VP MQTT fan-out between a real printer and slicers connected to a VP."""
  66. def __init__(
  67. self,
  68. *,
  69. vp_id: int,
  70. vp_name: str,
  71. vp_serial: str,
  72. target_printer_id: int,
  73. mqtt_server: SimpleMQTTServer,
  74. printer_manager: PrinterManager,
  75. ):
  76. self.vp_id = vp_id
  77. self.vp_name = vp_name
  78. self.vp_serial = vp_serial
  79. self.target_printer_id = target_printer_id
  80. self._mqtt_server = mqtt_server
  81. self._printer_manager = printer_manager
  82. self._target_client: BambuMQTTClient | None = None
  83. self._target_serial: str | None = None
  84. self._target_ip_uint32_le: int | None = None
  85. self._vp_ip_uint32_le: int | None = None
  86. self._loop: asyncio.AbstractEventLoop | None = None
  87. self._refresh_task: asyncio.Task | None = None
  88. self._stopping = False
  89. self._latest_print_state: dict | None = None
  90. self._latest_version_modules: list | None = None
  91. @property
  92. def is_active(self) -> bool:
  93. """True iff a target client is bound and currently connected."""
  94. client = self._target_client
  95. return bool(client is not None and getattr(client, "state", None) and client.state.connected)
  96. async def start(self) -> None:
  97. """Bind to the target printer (if connected) and start the refresh loop."""
  98. self._loop = asyncio.get_running_loop()
  99. self._stopping = False
  100. self._resolve_client()
  101. self._refresh_task = asyncio.create_task(self._refresh_loop())
  102. async def stop(self) -> None:
  103. """Detach from the target printer and stop the refresh loop."""
  104. self._stopping = True
  105. if self._refresh_task is not None:
  106. self._refresh_task.cancel()
  107. try:
  108. await self._refresh_task
  109. except asyncio.CancelledError:
  110. pass
  111. self._refresh_task = None
  112. self._unbind_client()
  113. self._loop = None
  114. async def _refresh_loop(self) -> None:
  115. """Re-resolve the target client periodically — paho clients can be replaced.
  116. BambuMQTTClient is destroyed and recreated on PrinterManager.connect_printer
  117. (e.g. printer config update). Without periodic refresh the bridge would lose
  118. fan-out after such a churn until the VP itself restarts.
  119. """
  120. try:
  121. while not self._stopping:
  122. await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
  123. self._resolve_client()
  124. except asyncio.CancelledError:
  125. raise
  126. except Exception:
  127. logger.exception("[%s] MQTT bridge refresh loop crashed", self.vp_name)
  128. def _resolve_client(self) -> None:
  129. """Look up the current client for target_printer_id and rebind if it changed."""
  130. try:
  131. current = self._printer_manager.get_client(self.target_printer_id)
  132. except Exception:
  133. logger.exception("[%s] MQTT bridge: get_client failed", self.vp_name)
  134. return
  135. if current is self._target_client:
  136. return
  137. # Client identity changed — unregister from the old, register on the new.
  138. self._unbind_client()
  139. if current is None:
  140. return
  141. try:
  142. current.register_raw_message_handler(self._on_printer_raw)
  143. except Exception:
  144. logger.exception("[%s] MQTT bridge: register_raw_message_handler failed", self.vp_name)
  145. return
  146. self._target_client = current
  147. self._target_serial = getattr(current, "serial_number", None)
  148. # Cache printer IP and VP bind IP encoded as little-endian uint32, so we
  149. # can rewrite `net.info[*].ip` in cached push_status. BambuStudio reads
  150. # that field for the FTP destination IP — without rewriting, the slicer
  151. # bypasses the VP and FTPs straight to the real printer.
  152. target_ip = getattr(current, "ip_address", None)
  153. vp_ip = getattr(self._mqtt_server, "bind_address", None)
  154. if target_ip and vp_ip and vp_ip not in ("0.0.0.0", "", None): # nosec B104
  155. try:
  156. self._target_ip_uint32_le = _ip_to_uint32_le(target_ip)
  157. self._vp_ip_uint32_le = _ip_to_uint32_le(vp_ip)
  158. except ValueError:
  159. self._target_ip_uint32_le = None
  160. self._vp_ip_uint32_le = None
  161. logger.info(
  162. "[%s] MQTT bridge bound to printer %s (serial=%s)",
  163. self.vp_name,
  164. self.target_printer_id,
  165. self._target_serial,
  166. )
  167. # Trigger a fresh get_version + pushall against the printer so the bridge
  168. # cache populates immediately. Bambuddy itself queries these on connect,
  169. # but that fires before the bridge attaches as a raw-message consumer,
  170. # so without this nudge the cache stays empty until the next periodic
  171. # query (which can be minutes away).
  172. request_fn = getattr(current, "_request_version", None)
  173. if callable(request_fn):
  174. try:
  175. request_fn()
  176. except Exception:
  177. logger.exception("[%s] MQTT bridge: _request_version failed", self.vp_name)
  178. request_status_fn = getattr(current, "request_status_update", None)
  179. if callable(request_status_fn):
  180. try:
  181. request_status_fn()
  182. except Exception:
  183. logger.exception("[%s] MQTT bridge: request_status_update failed", self.vp_name)
  184. def _unbind_client(self) -> None:
  185. if self._target_client is None:
  186. return
  187. try:
  188. self._target_client.unregister_raw_message_handler(self._on_printer_raw)
  189. except Exception:
  190. logger.exception("[%s] MQTT bridge: unregister_raw_message_handler failed", self.vp_name)
  191. logger.info("[%s] MQTT bridge unbound from printer %s", self.vp_name, self.target_printer_id)
  192. self._target_client = None
  193. self._target_serial = None
  194. def _on_printer_raw(self, topic: str, payload: bytes) -> None:
  195. """Paho-thread callback — cache the latest push_status for synthetic replay.
  196. Instead of fanning out a second stream of MQTT messages to the slicer
  197. (which trips BambuStudio's Send pre-flight consistency checks), we cache
  198. the latest real printer push_status here. The VP's existing 1 Hz
  199. synthetic push (which is what Send is built around) consults this cache
  200. and replaces its stub fields with real values when available.
  201. """
  202. if self._stopping:
  203. return
  204. target_serial = self._target_serial
  205. if not target_serial:
  206. return
  207. prefix = f"device/{target_serial}/"
  208. if not topic.startswith(prefix):
  209. return
  210. suffix = topic[len(prefix) :]
  211. if not suffix.startswith("report"):
  212. return
  213. try:
  214. data = json.loads(payload)
  215. except json.JSONDecodeError:
  216. return
  217. # Race-free by construction: `json.loads` returns a fresh dict tree per
  218. # call so paho-thread mutations below cannot collide with prior cached
  219. # state held by the asyncio thread. `_send_status_report`'s shallow
  220. # `dict(cached)` is also safe because nothing else writes to the cached
  221. # tree after assignment. The defensive deep-copy on store below removes
  222. # any future risk if a maintainer later re-enters the cached dict to
  223. # mutate it.
  224. # push_status snapshots → cache the print dict for the periodic 1 Hz
  225. # cached-as-base delivery. We do NOT fan these out separately (the
  226. # 1 Hz cached-as-base IS the slicer-facing push_status stream).
  227. print_data = data.get("print")
  228. if isinstance(print_data, dict) and print_data.get("command") == "push_status":
  229. for value in print_data.values():
  230. if isinstance(value, dict) and value.get("sn") == target_serial:
  231. value["sn"] = self.vp_serial
  232. # Note: `ipcam.rtsp_url` carries the real printer's IP. We pass it
  233. # through unchanged — the slicer uses it to fetch the live camera
  234. # stream directly from the printer. On the same LAN this works as
  235. # long as the slicer's stored access code matches the printer's
  236. # (i.e. configure the VP with the same access code as its target).
  237. # Rewrite real printer IP → VP bind IP in `net.info[*].ip` so the
  238. # slicer's FTP destination resolves to the VP, not the real printer.
  239. if self._target_ip_uint32_le is not None and self._vp_ip_uint32_le is not None:
  240. net = print_data.get("net")
  241. if isinstance(net, dict):
  242. info = net.get("info")
  243. if isinstance(info, list):
  244. for entry in info:
  245. if isinstance(entry, dict) and entry.get("ip") == self._target_ip_uint32_le:
  246. entry["ip"] = self._vp_ip_uint32_le
  247. # Defensive deep copy on store so the cache is fully decoupled from
  248. # the freshly-parsed tree and from any reader's reference.
  249. new_state = copy.deepcopy(print_data)
  250. # Bambu firmware sends two kinds of push_status: full pushall
  251. # responses (on `pushall` requests / printer reconnect) which
  252. # include AMS, vt_tray, net, etc. — and ~1 Hz incremental
  253. # updates with just the fields that changed (typically temps,
  254. # fan, wifi). Without preserving sticky fields from the previous
  255. # cache, the first incremental push after a pushall would wipe
  256. # AMS info from the bridge cache, and slicers reading the cache
  257. # between pushalls would see a stripped-down printer state with
  258. # no AMS visible until the next pushall — typically only when
  259. # the user power-cycles the printer (#1371). Mirror the same
  260. # preservation pattern Bambuddy uses for its own internal state
  261. # in bambu_mqtt.py (see _SLICER_VISIBLE_STICKY_KEYS below).
  262. prev = self._latest_print_state
  263. if prev is not None:
  264. for sticky_key in _SLICER_VISIBLE_STICKY_KEYS:
  265. if sticky_key not in new_state and sticky_key in prev:
  266. new_state[sticky_key] = prev[sticky_key]
  267. self._latest_print_state = new_state
  268. return
  269. # info.get_version responses → cache the module list so the synthetic
  270. # version response can include the real AMS modules.
  271. info_data = data.get("info")
  272. if isinstance(info_data, dict) and info_data.get("command") == "get_version":
  273. modules = info_data.get("module")
  274. if isinstance(modules, list):
  275. rewritten: list = []
  276. for module in modules:
  277. if isinstance(module, dict):
  278. module = dict(module)
  279. if module.get("sn") == target_serial:
  280. module["sn"] = self.vp_serial
  281. rewritten.append(module)
  282. self._latest_version_modules = rewritten
  283. # Don't fan out get_version — the slicer's request (when it issues
  284. # one) is intercepted locally and answered from the cached modules.
  285. return
  286. # Everything else (extrusion_cali_get response, AMS write acks, xcam
  287. # responses, …): fan out to the slicer. These are responses to commands
  288. # the slicer (or Bambuddy) issued; the slicer matches by sequence_id and
  289. # ignores responses to commands it didn't send. Without this, slicer-
  290. # initiated queries like extrusion_cali_get hang forever and BambuStudio
  291. # blocks Send waiting for the response.
  292. loop = self._loop
  293. if loop is None:
  294. return
  295. target_bytes = target_serial.encode("ascii")
  296. if target_bytes in payload:
  297. payload = payload.replace(target_bytes, self.vp_serial.encode("ascii"))
  298. vp_topic = f"device/{self.vp_serial}/{suffix}"
  299. try:
  300. asyncio.run_coroutine_threadsafe(
  301. self._mqtt_server.push_raw_to_clients(vp_topic, payload),
  302. loop,
  303. )
  304. except RuntimeError:
  305. pass
  306. def get_latest_print_state(self) -> dict | None:
  307. """Return the most recent real printer push_status `print` dict, or None."""
  308. return self._latest_print_state
  309. def get_latest_version_modules(self) -> list | None:
  310. """Return the most recent real printer get_version `module` list, or None."""
  311. return self._latest_version_modules
  312. def forward_to_printer(self, payload: dict) -> bool:
  313. """Publish a slicer-originated command to the real printer's request topic.
  314. Returns False if no printer client is currently bound.
  315. """
  316. client = self._target_client
  317. target_serial = self._target_serial
  318. if client is None or target_serial is None:
  319. logger.debug(
  320. "[%s] forward_to_printer dropped (printer %s not bound): %s",
  321. self.vp_name,
  322. self.target_printer_id,
  323. list(payload.keys()),
  324. )
  325. return False
  326. topic = f"device/{target_serial}/request"
  327. try:
  328. return client.publish_raw(topic, json.dumps(payload), qos=1)
  329. except Exception:
  330. logger.exception("[%s] forward_to_printer publish failed", self.vp_name)
  331. return False