mqtt_bridge.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. """MQTT bridge for non-proxy virtual printers.
  2. Mirrors the target printer's state to slicers connected to a virtual printer
  3. without opening a second MQTT session on the printer (reuses Bambuddy's
  4. existing subscription — firmware inflight budget unaffected, see PR #1164).
  5. Architecture (cached-as-base, not a separate fan-out stream):
  6. - **push_status** snapshots from the printer are CACHED here. The VP's
  7. `SimpleMQTTServer._send_status_report` consults that cache and sends
  8. a near-byte-identical copy of the real push to the slicer (with
  9. sequence_id / gcode_state / etc. overridden). Single source of truth
  10. keeps BambuStudio's Send pre-flight happy.
  11. - **info.get_version** responses are also cached so the synthetic version
  12. response can include the real AMS module list (n3f/n3s/ams entries).
  13. Without this BambuStudio's Prepare tab labels every AMS as "unknown".
  14. - **Other command responses** (extrusion_cali_get, AMS write acks,
  15. xcam responses, …) are fanned out raw to the slicer — they carry
  16. sequence_ids the slicer is waiting on; the slicer matches and ignores
  17. unrelated ones.
  18. Identity rewriting at cache time:
  19. - `upgrade_state.sn` (and any other nested dict's `sn` matching the real
  20. serial) → VP serial
  21. - `net.info[*].ip` little-endian uint32 → VP bind IP. BambuStudio reads
  22. this as the FTP destination IP. Without this the slicer FTPs straight
  23. to the real printer and bypasses Bambuddy.
  24. - `ipcam.rtsp_url` is left unchanged: BambuStudio overrides the URL host
  25. with the device IP it bound to (the VP), so the slicer hits the VP's
  26. own RTSPS proxy on port 322.
  27. """
  28. from __future__ import annotations
  29. import asyncio
  30. import copy
  31. import json
  32. import logging
  33. from typing import TYPE_CHECKING
  34. if TYPE_CHECKING:
  35. from backend.app.services.bambu_mqtt import BambuMQTTClient
  36. from backend.app.services.printer_manager import PrinterManager
  37. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  38. logger = logging.getLogger(__name__)
  39. REFRESH_INTERVAL_SECONDS = 30.0
  40. # Top-level push_status fields that Bambu firmware sends in FULL pushall
  41. # responses (on `pushall` request / printer reconnect) but typically OMITS
  42. # from 1 Hz incremental push_status updates. Without preserving these
  43. # fields across incremental updates, the bridge cache would lose AMS info
  44. # (and friends) between pushalls — slicers reading the cache would see a
  45. # stripped-down state and the fix would only re-appear on a manual printer
  46. # power-cycle (#1371). Mirrors the same set Bambuddy itself preserves in
  47. # bambu_mqtt.py:2686-2711 for its own internal raw_data, with a few more
  48. # entries that the slicer cares about (net, ipcam, lights_report).
  49. _SLICER_VISIBLE_STICKY_KEYS: tuple[str, ...] = (
  50. "ams",
  51. "vt_tray",
  52. "ams_extruder_map",
  53. "mapping",
  54. "net",
  55. "ipcam",
  56. "lights_report",
  57. # Pre-flight / Prepare-tab fields that BambuStudio reads off cached
  58. # push_status. Bambu firmware emits them in full pushall but typically
  59. # OMITS them from 1 Hz incremental updates, so without sticky-preservation
  60. # the cache drops them after the very next tick and the slicer's
  61. # "block Send while busy / unknown firmware" branch kicks in. Same shape
  62. # as #1228 (storage indicators) and #1558 (live-progress fields) —
  63. # cached-branch field-shape parity, not a new mechanism.
  64. "upgrade_state", # Send pre-flight reads dis_state / force_upgrade
  65. "xcam", # Prepare-tab reads spaghetti / first-layer / halt sensitivity
  66. "hw_switch_state", # Hardware switch state (Prepare tab)
  67. "nozzle_diameter",
  68. "nozzle_type",
  69. "online", # Module online map (ahb / rfid / version)
  70. "ams_status", # AMS overall status; can be ams_status-only incremental
  71. )
  72. def _ip_to_uint32_le(ip_str: str) -> int:
  73. """Encode dotted-quad IPv4 as little-endian uint32 (Bambu MQTT's `net.info[].ip` shape)."""
  74. parts = [int(x) for x in ip_str.split(".")]
  75. if len(parts) != 4 or any(p < 0 or p > 255 for p in parts):
  76. raise ValueError(f"invalid IPv4: {ip_str!r}")
  77. return parts[0] | (parts[1] << 8) | (parts[2] << 16) | (parts[3] << 24)
  78. def _merge_ams_dict(prev_ams: dict, new_ams: dict) -> dict:
  79. """Merge a new ``ams`` blob from an incremental push onto the previous one.
  80. Bambu firmware sends three shapes for the ``ams`` field on push_status:
  81. 1. Full pushall (after a printer reconnect or explicit pushall request):
  82. ``{ams: [{id, tray: [{id, tray_type, ...}, ...]}, ...], ams_status, ams_exist_bits, ...}``
  83. — every unit + every tray populated.
  84. 2. Status-only incremental: ``{ams_status: 1}`` or ``{humidity: 30}`` —
  85. no ``ams`` array at all. Bambuddy logs these as "AMS partial update
  86. (no tray data)" (#784 vintage).
  87. 3. Tray-targeted incremental during a print: ``{ams: [{id: 0, tray:
  88. [{id: 0, state: 11}]}]}`` — only the units / trays whose state
  89. changed.
  90. Replacing the cached ``ams`` wholesale on shapes (2) and (3) is what
  91. made the slicer "lose" AMS between pushalls and trip the symptom in
  92. #1387: the slicer would see a stripped ``ams_status``-only blob and
  93. fall back to its "no AMS" default render. This merge mirrors the
  94. deep-merge logic in ``bambu_mqtt.py::_handle_ams_data`` at the bridge
  95. layer so the slicer-facing cache always carries the latest known
  96. coherent state.
  97. Strategy:
  98. - Shallow-merge top-level scalars: keys in ``new`` win; keys only
  99. in ``prev`` are preserved.
  100. - For the ``ams`` array (list of units): match by ``id``. Units
  101. only in ``prev`` survive. Units in ``new`` overlay onto their
  102. ``prev`` counterpart; same recursion applies to each unit's
  103. ``tray`` array by tray ``id``.
  104. """
  105. merged = dict(prev_ams)
  106. for k, v in new_ams.items():
  107. if k != "ams":
  108. merged[k] = v
  109. prev_units = prev_ams.get("ams") if isinstance(prev_ams.get("ams"), list) else []
  110. new_units = new_ams.get("ams") if isinstance(new_ams.get("ams"), list) else None
  111. if new_units is None:
  112. # Shape (2): no ``ams`` array in the incremental — keep prev's units.
  113. if prev_units:
  114. merged["ams"] = prev_units
  115. return merged
  116. prev_by_id = {u.get("id"): u for u in prev_units if isinstance(u, dict) and u.get("id") is not None}
  117. merged_units: list = []
  118. seen_ids: set = set()
  119. for new_unit in new_units:
  120. if not isinstance(new_unit, dict):
  121. merged_units.append(new_unit)
  122. continue
  123. uid = new_unit.get("id")
  124. prev_unit = prev_by_id.get(uid) if uid is not None else None
  125. if prev_unit is None:
  126. merged_units.append(new_unit)
  127. if uid is not None:
  128. seen_ids.add(uid)
  129. continue
  130. # Shallow-merge unit fields; preserve prev's trays not present in new.
  131. merged_unit = dict(prev_unit)
  132. for k, v in new_unit.items():
  133. if k != "tray":
  134. merged_unit[k] = v
  135. new_trays = new_unit.get("tray") if isinstance(new_unit.get("tray"), list) else None
  136. if new_trays is None:
  137. # Unit-level partial — keep prev's tray list intact.
  138. pass
  139. else:
  140. prev_trays = prev_unit.get("tray") if isinstance(prev_unit.get("tray"), list) else []
  141. prev_trays_by_id = {t.get("id"): t for t in prev_trays if isinstance(t, dict) and t.get("id") is not None}
  142. merged_trays: list = []
  143. seen_tray_ids: set = set()
  144. for new_tray in new_trays:
  145. if not isinstance(new_tray, dict):
  146. merged_trays.append(new_tray)
  147. continue
  148. tid = new_tray.get("id")
  149. prev_tray = prev_trays_by_id.get(tid) if tid is not None else None
  150. if prev_tray is None:
  151. merged_trays.append(new_tray)
  152. else:
  153. merged_tray = dict(prev_tray)
  154. merged_tray.update(new_tray)
  155. merged_trays.append(merged_tray)
  156. if tid is not None:
  157. seen_tray_ids.add(tid)
  158. # Preserve prev trays not mentioned in the incremental.
  159. for tid, prev_tray in prev_trays_by_id.items():
  160. if tid not in seen_tray_ids:
  161. merged_trays.append(prev_tray)
  162. merged_unit["tray"] = merged_trays
  163. merged_units.append(merged_unit)
  164. if uid is not None:
  165. seen_ids.add(uid)
  166. # Preserve prev units not mentioned in the incremental.
  167. for uid, prev_unit in prev_by_id.items():
  168. if uid not in seen_ids:
  169. merged_units.append(prev_unit)
  170. merged["ams"] = merged_units
  171. return merged
  172. class MQTTBridge:
  173. """Per-VP MQTT fan-out between a real printer and slicers connected to a VP."""
  174. def __init__(
  175. self,
  176. *,
  177. vp_id: int,
  178. vp_name: str,
  179. vp_serial: str,
  180. target_printer_id: int,
  181. mqtt_server: SimpleMQTTServer,
  182. printer_manager: PrinterManager,
  183. ):
  184. self.vp_id = vp_id
  185. self.vp_name = vp_name
  186. self.vp_serial = vp_serial
  187. self.target_printer_id = target_printer_id
  188. self._mqtt_server = mqtt_server
  189. self._printer_manager = printer_manager
  190. self._target_client: BambuMQTTClient | None = None
  191. self._target_serial: str | None = None
  192. self._target_ip_uint32_le: int | None = None
  193. self._vp_ip_uint32_le: int | None = None
  194. self._loop: asyncio.AbstractEventLoop | None = None
  195. self._refresh_task: asyncio.Task | None = None
  196. self._stopping = False
  197. self._latest_print_state: dict | None = None
  198. self._latest_version_modules: list | None = None
  199. @property
  200. def is_active(self) -> bool:
  201. """True iff a target client is bound and currently connected."""
  202. client = self._target_client
  203. return bool(client is not None and getattr(client, "state", None) and client.state.connected)
  204. async def start(self) -> None:
  205. """Bind to the target printer (if connected) and start the refresh loop."""
  206. self._loop = asyncio.get_running_loop()
  207. self._stopping = False
  208. self._resolve_client()
  209. self._refresh_task = asyncio.create_task(self._refresh_loop())
  210. async def stop(self) -> None:
  211. """Detach from the target printer and stop the refresh loop."""
  212. self._stopping = True
  213. if self._refresh_task is not None:
  214. self._refresh_task.cancel()
  215. try:
  216. await self._refresh_task
  217. except asyncio.CancelledError:
  218. pass
  219. self._refresh_task = None
  220. self._unbind_client()
  221. self._loop = None
  222. async def _refresh_loop(self) -> None:
  223. """Re-resolve the target client periodically — paho clients can be replaced.
  224. BambuMQTTClient is destroyed and recreated on PrinterManager.connect_printer
  225. (e.g. printer config update). Without periodic refresh the bridge would lose
  226. fan-out after such a churn until the VP itself restarts.
  227. On crash exit, the handler must be unbound — otherwise the registered
  228. ``_on_printer_raw`` keeps firing on every real-printer message even
  229. though the bridge is functionally dead (memory leak + behaviour leak
  230. across VP restart).
  231. """
  232. try:
  233. while not self._stopping:
  234. await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
  235. self._resolve_client()
  236. except asyncio.CancelledError:
  237. raise
  238. except Exception:
  239. logger.exception("[%s] MQTT bridge refresh loop crashed", self.vp_name)
  240. # Crash exit — unbind so the orphaned handler stops firing.
  241. # ``stop()`` won't be invoked because the task completes done-not-cancelled.
  242. self._unbind_client()
  243. def _resolve_client(self) -> None:
  244. """Look up the current client for target_printer_id and rebind if it changed."""
  245. try:
  246. current = self._printer_manager.get_client(self.target_printer_id)
  247. except Exception:
  248. logger.exception("[%s] MQTT bridge: get_client failed", self.vp_name)
  249. return
  250. if current is self._target_client:
  251. return
  252. # Client identity changed — unregister from the old, register on the new.
  253. self._unbind_client()
  254. if current is None:
  255. return
  256. try:
  257. current.register_raw_message_handler(self._on_printer_raw)
  258. except Exception:
  259. logger.exception("[%s] MQTT bridge: register_raw_message_handler failed", self.vp_name)
  260. return
  261. self._target_client = current
  262. self._target_serial = getattr(current, "serial_number", None)
  263. # Cache printer IP and VP bind IP encoded as little-endian uint32, so we
  264. # can rewrite `net.info[*].ip` in cached push_status. BambuStudio reads
  265. # that field for the FTP destination IP — without rewriting, the slicer
  266. # bypasses the VP and FTPs straight to the real printer.
  267. target_ip = getattr(current, "ip_address", None)
  268. vp_ip = getattr(self._mqtt_server, "bind_address", None)
  269. if target_ip and vp_ip and vp_ip not in ("0.0.0.0", "", None): # nosec B104
  270. try:
  271. self._target_ip_uint32_le = _ip_to_uint32_le(target_ip)
  272. self._vp_ip_uint32_le = _ip_to_uint32_le(vp_ip)
  273. except ValueError:
  274. self._target_ip_uint32_le = None
  275. self._vp_ip_uint32_le = None
  276. logger.info(
  277. "[%s] MQTT bridge bound to printer %s (serial=%s)",
  278. self.vp_name,
  279. self.target_printer_id,
  280. self._target_serial,
  281. )
  282. # Trigger a fresh get_version + pushall against the printer so the bridge
  283. # cache populates immediately. Bambuddy itself queries these on connect,
  284. # but that fires before the bridge attaches as a raw-message consumer,
  285. # so without this nudge the cache stays empty until the next periodic
  286. # query (which can be minutes away).
  287. request_fn = getattr(current, "_request_version", None)
  288. if callable(request_fn):
  289. try:
  290. request_fn()
  291. except Exception:
  292. logger.exception("[%s] MQTT bridge: _request_version failed", self.vp_name)
  293. request_status_fn = getattr(current, "request_status_update", None)
  294. if callable(request_status_fn):
  295. try:
  296. request_status_fn()
  297. except Exception:
  298. logger.exception("[%s] MQTT bridge: request_status_update failed", self.vp_name)
  299. def _unbind_client(self) -> None:
  300. if self._target_client is None:
  301. return
  302. try:
  303. self._target_client.unregister_raw_message_handler(self._on_printer_raw)
  304. except Exception:
  305. logger.exception("[%s] MQTT bridge: unregister_raw_message_handler failed", self.vp_name)
  306. logger.info("[%s] MQTT bridge unbound from printer %s", self.vp_name, self.target_printer_id)
  307. self._target_client = None
  308. self._target_serial = None
  309. def _on_printer_raw(self, topic: str, payload: bytes) -> None:
  310. """Paho-thread callback — cache the latest push_status for synthetic replay.
  311. Instead of fanning out a second stream of MQTT messages to the slicer
  312. (which trips BambuStudio's Send pre-flight consistency checks), we cache
  313. the latest real printer push_status here. The VP's existing 1 Hz
  314. synthetic push (which is what Send is built around) consults this cache
  315. and replaces its stub fields with real values when available.
  316. """
  317. if self._stopping:
  318. return
  319. target_serial = self._target_serial
  320. if not target_serial:
  321. return
  322. prefix = f"device/{target_serial}/"
  323. if not topic.startswith(prefix):
  324. return
  325. suffix = topic[len(prefix) :]
  326. if not suffix.startswith("report"):
  327. return
  328. try:
  329. data = json.loads(payload)
  330. except json.JSONDecodeError:
  331. return
  332. # Race-free by construction: `json.loads` returns a fresh dict tree per
  333. # call so paho-thread mutations below cannot collide with prior cached
  334. # state held by the asyncio thread. `_send_status_report`'s shallow
  335. # `dict(cached)` is also safe because nothing else writes to the cached
  336. # tree after assignment. The defensive deep-copy on store below removes
  337. # any future risk if a maintainer later re-enters the cached dict to
  338. # mutate it.
  339. # push_status snapshots → cache the print dict for the periodic 1 Hz
  340. # cached-as-base delivery. We do NOT fan these out separately (the
  341. # 1 Hz cached-as-base IS the slicer-facing push_status stream).
  342. print_data = data.get("print")
  343. if isinstance(print_data, dict) and print_data.get("command") == "push_status":
  344. for value in print_data.values():
  345. if isinstance(value, dict) and value.get("sn") == target_serial:
  346. value["sn"] = self.vp_serial
  347. # Note: `ipcam.rtsp_url` carries the real printer's IP. We pass it
  348. # through unchanged — the slicer uses it to fetch the live camera
  349. # stream directly from the printer. On the same LAN this works as
  350. # long as the slicer's stored access code matches the printer's
  351. # (i.e. configure the VP with the same access code as its target).
  352. # Rewrite real printer IP → VP bind IP in `net.info[*].ip` so the
  353. # slicer's FTP destination resolves to the VP, not the real printer.
  354. if self._target_ip_uint32_le is not None and self._vp_ip_uint32_le is not None:
  355. net = print_data.get("net")
  356. if isinstance(net, dict):
  357. info = net.get("info")
  358. if isinstance(info, list):
  359. for entry in info:
  360. if isinstance(entry, dict) and entry.get("ip") == self._target_ip_uint32_le:
  361. entry["ip"] = self._vp_ip_uint32_le
  362. # Defensive deep copy on store so the cache is fully decoupled from
  363. # the freshly-parsed tree and from any reader's reference.
  364. new_state = copy.deepcopy(print_data)
  365. # Bambu firmware sends two kinds of push_status: full pushall
  366. # responses (on `pushall` requests / printer reconnect) which
  367. # include AMS, vt_tray, net, etc. — and ~1 Hz incremental
  368. # updates with just the fields that changed (typically temps,
  369. # fan, wifi). Without preserving sticky fields from the previous
  370. # cache, the first incremental push after a pushall would wipe
  371. # AMS info from the bridge cache, and slicers reading the cache
  372. # between pushalls would see a stripped-down printer state with
  373. # no AMS visible until the next pushall — typically only when
  374. # the user power-cycles the printer (#1371). Mirror the same
  375. # preservation pattern Bambuddy uses for its own internal state
  376. # in bambu_mqtt.py (see _SLICER_VISIBLE_STICKY_KEYS below).
  377. prev = self._latest_print_state
  378. if prev is not None:
  379. for sticky_key in _SLICER_VISIBLE_STICKY_KEYS:
  380. if sticky_key not in new_state:
  381. if sticky_key in prev:
  382. # Defensive deep copy — without this the carried-over
  383. # nested dicts/lists are shared between new_state and
  384. # the previous cache, so any in-place mutation later
  385. # (current or future code paths) would corrupt both.
  386. new_state[sticky_key] = copy.deepcopy(prev[sticky_key])
  387. continue
  388. # Key IS in new_state — but firmware sends partial blobs
  389. # (status-only / tray-targeted) under the same key on
  390. # incremental updates, which would overwrite the cached
  391. # full blob and break the slicer's AMS render (#1387).
  392. # For `ams` specifically the deep-merge mirrors what
  393. # Bambuddy already does internally in `_handle_ams_data`.
  394. if (
  395. sticky_key == "ams"
  396. and isinstance(new_state.get("ams"), dict)
  397. and isinstance(prev.get("ams"), dict)
  398. ):
  399. new_state["ams"] = _merge_ams_dict(prev["ams"], new_state["ams"])
  400. self._latest_print_state = new_state
  401. return
  402. # info.get_version responses → cache the module list so the synthetic
  403. # version response can include the real AMS modules.
  404. info_data = data.get("info")
  405. if isinstance(info_data, dict) and info_data.get("command") == "get_version":
  406. modules = info_data.get("module")
  407. if isinstance(modules, list):
  408. rewritten: list = []
  409. for module in modules:
  410. if isinstance(module, dict):
  411. module = dict(module)
  412. if module.get("sn") == target_serial:
  413. module["sn"] = self.vp_serial
  414. rewritten.append(module)
  415. self._latest_version_modules = rewritten
  416. # Don't fan out get_version — the slicer's request (when it issues
  417. # one) is intercepted locally and answered from the cached modules.
  418. return
  419. # Everything else (extrusion_cali_get response, AMS write acks, xcam
  420. # responses, …): fan out to the slicer. These are responses to commands
  421. # the slicer (or Bambuddy) issued; the slicer matches by sequence_id and
  422. # ignores responses to commands it didn't send. Without this, slicer-
  423. # initiated queries like extrusion_cali_get hang forever and BambuStudio
  424. # blocks Send waiting for the response.
  425. loop = self._loop
  426. if loop is None:
  427. return
  428. target_bytes = target_serial.encode("ascii")
  429. if target_bytes in payload:
  430. payload = payload.replace(target_bytes, self.vp_serial.encode("ascii"))
  431. vp_topic = f"device/{self.vp_serial}/{suffix}"
  432. try:
  433. asyncio.run_coroutine_threadsafe(
  434. self._mqtt_server.push_raw_to_clients(vp_topic, payload),
  435. loop,
  436. )
  437. except RuntimeError:
  438. pass
  439. def get_latest_print_state(self) -> dict | None:
  440. """Return the most recent real printer push_status `print` dict, or None."""
  441. return self._latest_print_state
  442. def get_latest_version_modules(self) -> list | None:
  443. """Return the most recent real printer get_version `module` list, or None."""
  444. return self._latest_version_modules
  445. def forward_to_printer(self, payload: dict) -> bool:
  446. """Publish a slicer-originated command to the real printer's request topic.
  447. Returns False if no printer client is currently bound.
  448. """
  449. client = self._target_client
  450. target_serial = self._target_serial
  451. if client is None or target_serial is None:
  452. logger.debug(
  453. "[%s] forward_to_printer dropped (printer %s not bound): %s",
  454. self.vp_name,
  455. self.target_printer_id,
  456. list(payload.keys()),
  457. )
  458. return False
  459. topic = f"device/{target_serial}/request"
  460. try:
  461. return client.publish_raw(topic, json.dumps(payload), qos=1)
  462. except Exception:
  463. logger.exception("[%s] forward_to_printer publish failed", self.vp_name)
  464. return False