mqtt_bridge.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. """MQTT bridge for non-proxy virtual printers.
  2. Mirrors the target printer's state to slicers connected to a virtual printer
  3. without opening a second MQTT session on the printer (reuses Bambuddy's
  4. existing subscription — firmware inflight budget unaffected, see PR #1164).
  5. Architecture (cached-as-base, not a separate fan-out stream):
  6. - **push_status** snapshots from the printer are CACHED here. The VP's
  7. `SimpleMQTTServer._send_status_report` consults that cache and sends
  8. a near-byte-identical copy of the real push to the slicer (with
  9. sequence_id / gcode_state / etc. overridden). Single source of truth
  10. keeps BambuStudio's Send pre-flight happy.
  11. - **info.get_version** responses are also cached so the synthetic version
  12. response can include the real AMS module list (n3f/n3s/ams entries).
  13. Without this BambuStudio's Prepare tab labels every AMS as "unknown".
  14. - **Other command responses** (extrusion_cali_get, AMS write acks,
  15. xcam responses, …) are fanned out raw to the slicer — they carry
  16. sequence_ids the slicer is waiting on; the slicer matches and ignores
  17. unrelated ones.
  18. Identity rewriting at cache time:
  19. - `upgrade_state.sn` (and any other nested dict's `sn` matching the real
  20. serial) → VP serial
  21. - `net.info[*].ip` little-endian uint32 → VP bind IP. BambuStudio reads
  22. this as the FTP destination IP. Without this the slicer FTPs straight
  23. to the real printer and bypasses Bambuddy.
  24. - `ipcam.rtsp_url` is left unchanged: BambuStudio overrides the URL host
  25. with the device IP it bound to (the VP), so the slicer hits the VP's
  26. own RTSPS proxy on port 322.
  27. """
  28. from __future__ import annotations
  29. import asyncio
  30. import copy
  31. import json
  32. import logging
  33. from typing import TYPE_CHECKING
  34. if TYPE_CHECKING:
  35. from backend.app.services.bambu_mqtt import BambuMQTTClient
  36. from backend.app.services.printer_manager import PrinterManager
  37. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  38. logger = logging.getLogger(__name__)
  39. REFRESH_INTERVAL_SECONDS = 30.0
  40. # Top-level push_status fields that Bambu firmware sends in FULL pushall
  41. # responses (on `pushall` request / printer reconnect) but typically OMITS
  42. # from 1 Hz incremental push_status updates. Without preserving these
  43. # fields across incremental updates, the bridge cache would lose AMS info
  44. # (and friends) between pushalls — slicers reading the cache would see a
  45. # stripped-down state and the fix would only re-appear on a manual printer
  46. # power-cycle (#1371). Mirrors the same set Bambuddy itself preserves in
  47. # bambu_mqtt.py:2686-2711 for its own internal raw_data, with a few more
  48. # entries that the slicer cares about (net, ipcam, lights_report).
  49. _SLICER_VISIBLE_STICKY_KEYS: tuple[str, ...] = (
  50. "ams",
  51. "vt_tray",
  52. "ams_extruder_map",
  53. "mapping",
  54. "net",
  55. "ipcam",
  56. "lights_report",
  57. )
  58. def _ip_to_uint32_le(ip_str: str) -> int:
  59. """Encode dotted-quad IPv4 as little-endian uint32 (Bambu MQTT's `net.info[].ip` shape)."""
  60. parts = [int(x) for x in ip_str.split(".")]
  61. if len(parts) != 4 or any(p < 0 or p > 255 for p in parts):
  62. raise ValueError(f"invalid IPv4: {ip_str!r}")
  63. return parts[0] | (parts[1] << 8) | (parts[2] << 16) | (parts[3] << 24)
  64. def _merge_ams_dict(prev_ams: dict, new_ams: dict) -> dict:
  65. """Merge a new ``ams`` blob from an incremental push onto the previous one.
  66. Bambu firmware sends three shapes for the ``ams`` field on push_status:
  67. 1. Full pushall (after a printer reconnect or explicit pushall request):
  68. ``{ams: [{id, tray: [{id, tray_type, ...}, ...]}, ...], ams_status, ams_exist_bits, ...}``
  69. — every unit + every tray populated.
  70. 2. Status-only incremental: ``{ams_status: 1}`` or ``{humidity: 30}`` —
  71. no ``ams`` array at all. Bambuddy logs these as "AMS partial update
  72. (no tray data)" (#784 vintage).
  73. 3. Tray-targeted incremental during a print: ``{ams: [{id: 0, tray:
  74. [{id: 0, state: 11}]}]}`` — only the units / trays whose state
  75. changed.
  76. Replacing the cached ``ams`` wholesale on shapes (2) and (3) is what
  77. made the slicer "lose" AMS between pushalls and trip the symptom in
  78. #1387: the slicer would see a stripped ``ams_status``-only blob and
  79. fall back to its "no AMS" default render. This merge mirrors the
  80. deep-merge logic in ``bambu_mqtt.py::_handle_ams_data`` at the bridge
  81. layer so the slicer-facing cache always carries the latest known
  82. coherent state.
  83. Strategy:
  84. - Shallow-merge top-level scalars: keys in ``new`` win; keys only
  85. in ``prev`` are preserved.
  86. - For the ``ams`` array (list of units): match by ``id``. Units
  87. only in ``prev`` survive. Units in ``new`` overlay onto their
  88. ``prev`` counterpart; same recursion applies to each unit's
  89. ``tray`` array by tray ``id``.
  90. """
  91. merged = dict(prev_ams)
  92. for k, v in new_ams.items():
  93. if k != "ams":
  94. merged[k] = v
  95. prev_units = prev_ams.get("ams") if isinstance(prev_ams.get("ams"), list) else []
  96. new_units = new_ams.get("ams") if isinstance(new_ams.get("ams"), list) else None
  97. if new_units is None:
  98. # Shape (2): no ``ams`` array in the incremental — keep prev's units.
  99. if prev_units:
  100. merged["ams"] = prev_units
  101. return merged
  102. prev_by_id = {u.get("id"): u for u in prev_units if isinstance(u, dict) and u.get("id") is not None}
  103. merged_units: list = []
  104. seen_ids: set = set()
  105. for new_unit in new_units:
  106. if not isinstance(new_unit, dict):
  107. merged_units.append(new_unit)
  108. continue
  109. uid = new_unit.get("id")
  110. prev_unit = prev_by_id.get(uid) if uid is not None else None
  111. if prev_unit is None:
  112. merged_units.append(new_unit)
  113. if uid is not None:
  114. seen_ids.add(uid)
  115. continue
  116. # Shallow-merge unit fields; preserve prev's trays not present in new.
  117. merged_unit = dict(prev_unit)
  118. for k, v in new_unit.items():
  119. if k != "tray":
  120. merged_unit[k] = v
  121. new_trays = new_unit.get("tray") if isinstance(new_unit.get("tray"), list) else None
  122. if new_trays is None:
  123. # Unit-level partial — keep prev's tray list intact.
  124. pass
  125. else:
  126. prev_trays = prev_unit.get("tray") if isinstance(prev_unit.get("tray"), list) else []
  127. prev_trays_by_id = {t.get("id"): t for t in prev_trays if isinstance(t, dict) and t.get("id") is not None}
  128. merged_trays: list = []
  129. seen_tray_ids: set = set()
  130. for new_tray in new_trays:
  131. if not isinstance(new_tray, dict):
  132. merged_trays.append(new_tray)
  133. continue
  134. tid = new_tray.get("id")
  135. prev_tray = prev_trays_by_id.get(tid) if tid is not None else None
  136. if prev_tray is None:
  137. merged_trays.append(new_tray)
  138. else:
  139. merged_tray = dict(prev_tray)
  140. merged_tray.update(new_tray)
  141. merged_trays.append(merged_tray)
  142. if tid is not None:
  143. seen_tray_ids.add(tid)
  144. # Preserve prev trays not mentioned in the incremental.
  145. for tid, prev_tray in prev_trays_by_id.items():
  146. if tid not in seen_tray_ids:
  147. merged_trays.append(prev_tray)
  148. merged_unit["tray"] = merged_trays
  149. merged_units.append(merged_unit)
  150. if uid is not None:
  151. seen_ids.add(uid)
  152. # Preserve prev units not mentioned in the incremental.
  153. for uid, prev_unit in prev_by_id.items():
  154. if uid not in seen_ids:
  155. merged_units.append(prev_unit)
  156. merged["ams"] = merged_units
  157. return merged
  158. class MQTTBridge:
  159. """Per-VP MQTT fan-out between a real printer and slicers connected to a VP."""
  160. def __init__(
  161. self,
  162. *,
  163. vp_id: int,
  164. vp_name: str,
  165. vp_serial: str,
  166. target_printer_id: int,
  167. mqtt_server: SimpleMQTTServer,
  168. printer_manager: PrinterManager,
  169. ):
  170. self.vp_id = vp_id
  171. self.vp_name = vp_name
  172. self.vp_serial = vp_serial
  173. self.target_printer_id = target_printer_id
  174. self._mqtt_server = mqtt_server
  175. self._printer_manager = printer_manager
  176. self._target_client: BambuMQTTClient | None = None
  177. self._target_serial: str | None = None
  178. self._target_ip_uint32_le: int | None = None
  179. self._vp_ip_uint32_le: int | None = None
  180. self._loop: asyncio.AbstractEventLoop | None = None
  181. self._refresh_task: asyncio.Task | None = None
  182. self._stopping = False
  183. self._latest_print_state: dict | None = None
  184. self._latest_version_modules: list | None = None
  185. @property
  186. def is_active(self) -> bool:
  187. """True iff a target client is bound and currently connected."""
  188. client = self._target_client
  189. return bool(client is not None and getattr(client, "state", None) and client.state.connected)
  190. async def start(self) -> None:
  191. """Bind to the target printer (if connected) and start the refresh loop."""
  192. self._loop = asyncio.get_running_loop()
  193. self._stopping = False
  194. self._resolve_client()
  195. self._refresh_task = asyncio.create_task(self._refresh_loop())
  196. async def stop(self) -> None:
  197. """Detach from the target printer and stop the refresh loop."""
  198. self._stopping = True
  199. if self._refresh_task is not None:
  200. self._refresh_task.cancel()
  201. try:
  202. await self._refresh_task
  203. except asyncio.CancelledError:
  204. pass
  205. self._refresh_task = None
  206. self._unbind_client()
  207. self._loop = None
  208. async def _refresh_loop(self) -> None:
  209. """Re-resolve the target client periodically — paho clients can be replaced.
  210. BambuMQTTClient is destroyed and recreated on PrinterManager.connect_printer
  211. (e.g. printer config update). Without periodic refresh the bridge would lose
  212. fan-out after such a churn until the VP itself restarts.
  213. """
  214. try:
  215. while not self._stopping:
  216. await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
  217. self._resolve_client()
  218. except asyncio.CancelledError:
  219. raise
  220. except Exception:
  221. logger.exception("[%s] MQTT bridge refresh loop crashed", self.vp_name)
  222. def _resolve_client(self) -> None:
  223. """Look up the current client for target_printer_id and rebind if it changed."""
  224. try:
  225. current = self._printer_manager.get_client(self.target_printer_id)
  226. except Exception:
  227. logger.exception("[%s] MQTT bridge: get_client failed", self.vp_name)
  228. return
  229. if current is self._target_client:
  230. return
  231. # Client identity changed — unregister from the old, register on the new.
  232. self._unbind_client()
  233. if current is None:
  234. return
  235. try:
  236. current.register_raw_message_handler(self._on_printer_raw)
  237. except Exception:
  238. logger.exception("[%s] MQTT bridge: register_raw_message_handler failed", self.vp_name)
  239. return
  240. self._target_client = current
  241. self._target_serial = getattr(current, "serial_number", None)
  242. # Cache printer IP and VP bind IP encoded as little-endian uint32, so we
  243. # can rewrite `net.info[*].ip` in cached push_status. BambuStudio reads
  244. # that field for the FTP destination IP — without rewriting, the slicer
  245. # bypasses the VP and FTPs straight to the real printer.
  246. target_ip = getattr(current, "ip_address", None)
  247. vp_ip = getattr(self._mqtt_server, "bind_address", None)
  248. if target_ip and vp_ip and vp_ip not in ("0.0.0.0", "", None): # nosec B104
  249. try:
  250. self._target_ip_uint32_le = _ip_to_uint32_le(target_ip)
  251. self._vp_ip_uint32_le = _ip_to_uint32_le(vp_ip)
  252. except ValueError:
  253. self._target_ip_uint32_le = None
  254. self._vp_ip_uint32_le = None
  255. logger.info(
  256. "[%s] MQTT bridge bound to printer %s (serial=%s)",
  257. self.vp_name,
  258. self.target_printer_id,
  259. self._target_serial,
  260. )
  261. # Trigger a fresh get_version + pushall against the printer so the bridge
  262. # cache populates immediately. Bambuddy itself queries these on connect,
  263. # but that fires before the bridge attaches as a raw-message consumer,
  264. # so without this nudge the cache stays empty until the next periodic
  265. # query (which can be minutes away).
  266. request_fn = getattr(current, "_request_version", None)
  267. if callable(request_fn):
  268. try:
  269. request_fn()
  270. except Exception:
  271. logger.exception("[%s] MQTT bridge: _request_version failed", self.vp_name)
  272. request_status_fn = getattr(current, "request_status_update", None)
  273. if callable(request_status_fn):
  274. try:
  275. request_status_fn()
  276. except Exception:
  277. logger.exception("[%s] MQTT bridge: request_status_update failed", self.vp_name)
  278. def _unbind_client(self) -> None:
  279. if self._target_client is None:
  280. return
  281. try:
  282. self._target_client.unregister_raw_message_handler(self._on_printer_raw)
  283. except Exception:
  284. logger.exception("[%s] MQTT bridge: unregister_raw_message_handler failed", self.vp_name)
  285. logger.info("[%s] MQTT bridge unbound from printer %s", self.vp_name, self.target_printer_id)
  286. self._target_client = None
  287. self._target_serial = None
  288. def _on_printer_raw(self, topic: str, payload: bytes) -> None:
  289. """Paho-thread callback — cache the latest push_status for synthetic replay.
  290. Instead of fanning out a second stream of MQTT messages to the slicer
  291. (which trips BambuStudio's Send pre-flight consistency checks), we cache
  292. the latest real printer push_status here. The VP's existing 1 Hz
  293. synthetic push (which is what Send is built around) consults this cache
  294. and replaces its stub fields with real values when available.
  295. """
  296. if self._stopping:
  297. return
  298. target_serial = self._target_serial
  299. if not target_serial:
  300. return
  301. prefix = f"device/{target_serial}/"
  302. if not topic.startswith(prefix):
  303. return
  304. suffix = topic[len(prefix) :]
  305. if not suffix.startswith("report"):
  306. return
  307. try:
  308. data = json.loads(payload)
  309. except json.JSONDecodeError:
  310. return
  311. # Race-free by construction: `json.loads` returns a fresh dict tree per
  312. # call so paho-thread mutations below cannot collide with prior cached
  313. # state held by the asyncio thread. `_send_status_report`'s shallow
  314. # `dict(cached)` is also safe because nothing else writes to the cached
  315. # tree after assignment. The defensive deep-copy on store below removes
  316. # any future risk if a maintainer later re-enters the cached dict to
  317. # mutate it.
  318. # push_status snapshots → cache the print dict for the periodic 1 Hz
  319. # cached-as-base delivery. We do NOT fan these out separately (the
  320. # 1 Hz cached-as-base IS the slicer-facing push_status stream).
  321. print_data = data.get("print")
  322. if isinstance(print_data, dict) and print_data.get("command") == "push_status":
  323. for value in print_data.values():
  324. if isinstance(value, dict) and value.get("sn") == target_serial:
  325. value["sn"] = self.vp_serial
  326. # Note: `ipcam.rtsp_url` carries the real printer's IP. We pass it
  327. # through unchanged — the slicer uses it to fetch the live camera
  328. # stream directly from the printer. On the same LAN this works as
  329. # long as the slicer's stored access code matches the printer's
  330. # (i.e. configure the VP with the same access code as its target).
  331. # Rewrite real printer IP → VP bind IP in `net.info[*].ip` so the
  332. # slicer's FTP destination resolves to the VP, not the real printer.
  333. if self._target_ip_uint32_le is not None and self._vp_ip_uint32_le is not None:
  334. net = print_data.get("net")
  335. if isinstance(net, dict):
  336. info = net.get("info")
  337. if isinstance(info, list):
  338. for entry in info:
  339. if isinstance(entry, dict) and entry.get("ip") == self._target_ip_uint32_le:
  340. entry["ip"] = self._vp_ip_uint32_le
  341. # Defensive deep copy on store so the cache is fully decoupled from
  342. # the freshly-parsed tree and from any reader's reference.
  343. new_state = copy.deepcopy(print_data)
  344. # Bambu firmware sends two kinds of push_status: full pushall
  345. # responses (on `pushall` requests / printer reconnect) which
  346. # include AMS, vt_tray, net, etc. — and ~1 Hz incremental
  347. # updates with just the fields that changed (typically temps,
  348. # fan, wifi). Without preserving sticky fields from the previous
  349. # cache, the first incremental push after a pushall would wipe
  350. # AMS info from the bridge cache, and slicers reading the cache
  351. # between pushalls would see a stripped-down printer state with
  352. # no AMS visible until the next pushall — typically only when
  353. # the user power-cycles the printer (#1371). Mirror the same
  354. # preservation pattern Bambuddy uses for its own internal state
  355. # in bambu_mqtt.py (see _SLICER_VISIBLE_STICKY_KEYS below).
  356. prev = self._latest_print_state
  357. if prev is not None:
  358. for sticky_key in _SLICER_VISIBLE_STICKY_KEYS:
  359. if sticky_key not in new_state:
  360. if sticky_key in prev:
  361. new_state[sticky_key] = prev[sticky_key]
  362. continue
  363. # Key IS in new_state — but firmware sends partial blobs
  364. # (status-only / tray-targeted) under the same key on
  365. # incremental updates, which would overwrite the cached
  366. # full blob and break the slicer's AMS render (#1387).
  367. # For `ams` specifically the deep-merge mirrors what
  368. # Bambuddy already does internally in `_handle_ams_data`.
  369. if (
  370. sticky_key == "ams"
  371. and isinstance(new_state.get("ams"), dict)
  372. and isinstance(prev.get("ams"), dict)
  373. ):
  374. new_state["ams"] = _merge_ams_dict(prev["ams"], new_state["ams"])
  375. self._latest_print_state = new_state
  376. return
  377. # info.get_version responses → cache the module list so the synthetic
  378. # version response can include the real AMS modules.
  379. info_data = data.get("info")
  380. if isinstance(info_data, dict) and info_data.get("command") == "get_version":
  381. modules = info_data.get("module")
  382. if isinstance(modules, list):
  383. rewritten: list = []
  384. for module in modules:
  385. if isinstance(module, dict):
  386. module = dict(module)
  387. if module.get("sn") == target_serial:
  388. module["sn"] = self.vp_serial
  389. rewritten.append(module)
  390. self._latest_version_modules = rewritten
  391. # Don't fan out get_version — the slicer's request (when it issues
  392. # one) is intercepted locally and answered from the cached modules.
  393. return
  394. # Everything else (extrusion_cali_get response, AMS write acks, xcam
  395. # responses, …): fan out to the slicer. These are responses to commands
  396. # the slicer (or Bambuddy) issued; the slicer matches by sequence_id and
  397. # ignores responses to commands it didn't send. Without this, slicer-
  398. # initiated queries like extrusion_cali_get hang forever and BambuStudio
  399. # blocks Send waiting for the response.
  400. loop = self._loop
  401. if loop is None:
  402. return
  403. target_bytes = target_serial.encode("ascii")
  404. if target_bytes in payload:
  405. payload = payload.replace(target_bytes, self.vp_serial.encode("ascii"))
  406. vp_topic = f"device/{self.vp_serial}/{suffix}"
  407. try:
  408. asyncio.run_coroutine_threadsafe(
  409. self._mqtt_server.push_raw_to_clients(vp_topic, payload),
  410. loop,
  411. )
  412. except RuntimeError:
  413. pass
  414. def get_latest_print_state(self) -> dict | None:
  415. """Return the most recent real printer push_status `print` dict, or None."""
  416. return self._latest_print_state
  417. def get_latest_version_modules(self) -> list | None:
  418. """Return the most recent real printer get_version `module` list, or None."""
  419. return self._latest_version_modules
  420. def forward_to_printer(self, payload: dict) -> bool:
  421. """Publish a slicer-originated command to the real printer's request topic.
  422. Returns False if no printer client is currently bound.
  423. """
  424. client = self._target_client
  425. target_serial = self._target_serial
  426. if client is None or target_serial is None:
  427. logger.debug(
  428. "[%s] forward_to_printer dropped (printer %s not bound): %s",
  429. self.vp_name,
  430. self.target_printer_id,
  431. list(payload.keys()),
  432. )
  433. return False
  434. topic = f"device/{target_serial}/request"
  435. try:
  436. return client.publish_raw(topic, json.dumps(payload), qos=1)
  437. except Exception:
  438. logger.exception("[%s] forward_to_printer publish failed", self.vp_name)
  439. return False