mqtt_bridge.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. """MQTT bridge for non-proxy virtual printers.
  2. Mirrors the target printer's state to slicers connected to a virtual printer
  3. without opening a second MQTT session on the printer (reuses Bambuddy's
  4. existing subscription — firmware inflight budget unaffected, see PR #1164).
  5. Architecture (cached-as-base, not a separate fan-out stream):
  6. - **push_status** snapshots from the printer are CACHED here. The VP's
  7. `SimpleMQTTServer._send_status_report` consults that cache and sends
  8. a near-byte-identical copy of the real push to the slicer (with
  9. sequence_id / gcode_state / etc. overridden). Single source of truth
  10. keeps BambuStudio's Send pre-flight happy.
  11. - **info.get_version** responses are also cached so the synthetic version
  12. response can include the real AMS module list (n3f/n3s/ams entries).
  13. Without this BambuStudio's Prepare tab labels every AMS as "unknown".
  14. - **Other command responses** (extrusion_cali_get, AMS write acks,
  15. xcam responses, …) are fanned out raw to the slicer — they carry
  16. sequence_ids the slicer is waiting on; the slicer matches and ignores
  17. unrelated ones.
  18. Identity rewriting at cache time:
  19. - `upgrade_state.sn` (and any other nested dict's `sn` matching the real
  20. serial) → VP serial
  21. - `net.info[*].ip` little-endian uint32 → VP bind IP. BambuStudio reads
  22. this as the FTP destination IP. Without this the slicer FTPs straight
  23. to the real printer and bypasses Bambuddy.
  24. - `ipcam.rtsp_url` is left unchanged: BambuStudio overrides the URL host
  25. with the device IP it bound to (the VP), so the slicer hits the VP's
  26. own RTSPS proxy on port 322.
  27. """
  28. from __future__ import annotations
  29. import asyncio
  30. import copy
  31. import json
  32. import logging
  33. from typing import TYPE_CHECKING
  34. if TYPE_CHECKING:
  35. from backend.app.services.bambu_mqtt import BambuMQTTClient
  36. from backend.app.services.printer_manager import PrinterManager
  37. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  38. logger = logging.getLogger(__name__)
  39. REFRESH_INTERVAL_SECONDS = 30.0
  40. # Top-level push_status fields that Bambu firmware sends in FULL pushall
  41. # responses (on `pushall` request / printer reconnect) but typically OMITS
  42. # from 1 Hz incremental push_status updates. Without preserving these
  43. # fields across incremental updates, the bridge cache would lose AMS info
  44. # (and friends) between pushalls — slicers reading the cache would see a
  45. # stripped-down state and the fix would only re-appear on a manual printer
  46. # power-cycle (#1371). Mirrors the same set Bambuddy itself preserves in
  47. # bambu_mqtt.py:2686-2711 for its own internal raw_data, with a few more
  48. # entries that the slicer cares about (net, ipcam, lights_report).
  49. _SLICER_VISIBLE_STICKY_KEYS: tuple[str, ...] = (
  50. "ams",
  51. "vt_tray",
  52. "ams_extruder_map",
  53. "mapping",
  54. "net",
  55. "ipcam",
  56. "lights_report",
  57. # Pre-flight / Prepare-tab fields that BambuStudio reads off cached
  58. # push_status. Bambu firmware emits them in full pushall but typically
  59. # OMITS them from 1 Hz incremental updates, so without sticky-preservation
  60. # the cache drops them after the very next tick and the slicer's
  61. # "block Send while busy / unknown firmware" branch kicks in. Same shape
  62. # as #1228 (storage indicators) and #1558 (live-progress fields) —
  63. # cached-branch field-shape parity, not a new mechanism.
  64. "upgrade_state", # Send pre-flight reads dis_state / force_upgrade
  65. "xcam", # Prepare-tab reads spaghetti / first-layer / halt sensitivity
  66. "hw_switch_state", # Hardware switch state (Prepare tab)
  67. "nozzle_diameter",
  68. "nozzle_type",
  69. "online", # Module online map (ahb / rfid / version)
  70. "ams_status", # AMS overall status; can be ams_status-only incremental
  71. )
  72. def _ip_to_uint32_le(ip_str: str) -> int:
  73. """Encode dotted-quad IPv4 as little-endian uint32 (Bambu MQTT's `net.info[].ip` shape)."""
  74. parts = [int(x) for x in ip_str.split(".")]
  75. if len(parts) != 4 or any(p < 0 or p > 255 for p in parts):
  76. raise ValueError(f"invalid IPv4: {ip_str!r}")
  77. return parts[0] | (parts[1] << 8) | (parts[2] << 16) | (parts[3] << 24)
  78. def _merge_ams_dict(prev_ams: dict, new_ams: dict) -> dict:
  79. """Merge a new ``ams`` blob from an incremental push onto the previous one.
  80. Bambu firmware sends three shapes for the ``ams`` field on push_status:
  81. 1. Full pushall (after a printer reconnect or explicit pushall request):
  82. ``{ams: [{id, tray: [{id, tray_type, ...}, ...]}, ...], ams_status, ams_exist_bits, ...}``
  83. — every unit + every tray populated.
  84. 2. Status-only incremental: ``{ams_status: 1}`` or ``{humidity: 30}`` —
  85. no ``ams`` array at all. Bambuddy logs these as "AMS partial update
  86. (no tray data)" (#784 vintage).
  87. 3. Tray-targeted incremental during a print: ``{ams: [{id: 0, tray:
  88. [{id: 0, state: 11}]}]}`` — only the units / trays whose state
  89. changed.
  90. Replacing the cached ``ams`` wholesale on shapes (2) and (3) is what
  91. made the slicer "lose" AMS between pushalls and trip the symptom in
  92. #1387: the slicer would see a stripped ``ams_status``-only blob and
  93. fall back to its "no AMS" default render. This merge mirrors the
  94. deep-merge logic in ``bambu_mqtt.py::_handle_ams_data`` at the bridge
  95. layer so the slicer-facing cache always carries the latest known
  96. coherent state.
  97. Strategy:
  98. - Shallow-merge top-level scalars: keys in ``new`` win; keys only
  99. in ``prev`` are preserved.
  100. - For the ``ams`` array (list of units): match by ``id``. Units
  101. only in ``prev`` survive. Units in ``new`` overlay onto their
  102. ``prev`` counterpart; same recursion applies to each unit's
  103. ``tray`` array by tray ``id``.
  104. """
  105. merged = dict(prev_ams)
  106. for k, v in new_ams.items():
  107. if k != "ams":
  108. merged[k] = v
  109. prev_units = prev_ams.get("ams") if isinstance(prev_ams.get("ams"), list) else []
  110. new_units = new_ams.get("ams") if isinstance(new_ams.get("ams"), list) else None
  111. if new_units is None:
  112. # Shape (2): no ``ams`` array in the incremental — keep prev's units.
  113. if prev_units:
  114. merged["ams"] = prev_units
  115. return merged
  116. prev_by_id = {u.get("id"): u for u in prev_units if isinstance(u, dict) and u.get("id") is not None}
  117. merged_units: list = []
  118. seen_ids: set = set()
  119. for new_unit in new_units:
  120. if not isinstance(new_unit, dict):
  121. merged_units.append(new_unit)
  122. continue
  123. uid = new_unit.get("id")
  124. prev_unit = prev_by_id.get(uid) if uid is not None else None
  125. if prev_unit is None:
  126. merged_units.append(new_unit)
  127. if uid is not None:
  128. seen_ids.add(uid)
  129. continue
  130. # Shallow-merge unit fields; preserve prev's trays not present in new.
  131. merged_unit = dict(prev_unit)
  132. for k, v in new_unit.items():
  133. if k != "tray":
  134. merged_unit[k] = v
  135. new_trays = new_unit.get("tray") if isinstance(new_unit.get("tray"), list) else None
  136. if new_trays is None:
  137. # Unit-level partial — keep prev's tray list intact.
  138. pass
  139. else:
  140. prev_trays = prev_unit.get("tray") if isinstance(prev_unit.get("tray"), list) else []
  141. prev_trays_by_id = {t.get("id"): t for t in prev_trays if isinstance(t, dict) and t.get("id") is not None}
  142. merged_trays: list = []
  143. seen_tray_ids: set = set()
  144. for new_tray in new_trays:
  145. if not isinstance(new_tray, dict):
  146. merged_trays.append(new_tray)
  147. continue
  148. tid = new_tray.get("id")
  149. prev_tray = prev_trays_by_id.get(tid) if tid is not None else None
  150. if prev_tray is None:
  151. merged_trays.append(new_tray)
  152. else:
  153. merged_tray = dict(prev_tray)
  154. merged_tray.update(new_tray)
  155. merged_trays.append(merged_tray)
  156. if tid is not None:
  157. seen_tray_ids.add(tid)
  158. # Preserve prev trays not mentioned in the incremental.
  159. for tid, prev_tray in prev_trays_by_id.items():
  160. if tid not in seen_tray_ids:
  161. merged_trays.append(prev_tray)
  162. merged_unit["tray"] = merged_trays
  163. merged_units.append(merged_unit)
  164. if uid is not None:
  165. seen_ids.add(uid)
  166. # Preserve prev units not mentioned in the incremental.
  167. for uid, prev_unit in prev_by_id.items():
  168. if uid not in seen_ids:
  169. merged_units.append(prev_unit)
  170. merged["ams"] = merged_units
  171. return merged
  172. class MQTTBridge:
  173. """Per-VP MQTT fan-out between a real printer and slicers connected to a VP."""
  174. def __init__(
  175. self,
  176. *,
  177. vp_id: int,
  178. vp_name: str,
  179. vp_serial: str,
  180. target_printer_id: int,
  181. mqtt_server: SimpleMQTTServer,
  182. printer_manager: PrinterManager,
  183. ):
  184. self.vp_id = vp_id
  185. self.vp_name = vp_name
  186. self.vp_serial = vp_serial
  187. self.target_printer_id = target_printer_id
  188. self._mqtt_server = mqtt_server
  189. self._printer_manager = printer_manager
  190. self._target_client: BambuMQTTClient | None = None
  191. self._target_serial: str | None = None
  192. self._target_ip_uint32_le: int | None = None
  193. self._vp_ip_uint32_le: int | None = None
  194. self._loop: asyncio.AbstractEventLoop | None = None
  195. self._refresh_task: asyncio.Task | None = None
  196. self._stopping = False
  197. self._latest_print_state: dict | None = None
  198. self._latest_version_modules: list | None = None
  199. @property
  200. def is_active(self) -> bool:
  201. """True iff a target client is bound and currently connected."""
  202. client = self._target_client
  203. return bool(client is not None and getattr(client, "state", None) and client.state.connected)
  204. async def start(self) -> None:
  205. """Bind to the target printer (if connected) and start the refresh loop."""
  206. self._loop = asyncio.get_running_loop()
  207. self._stopping = False
  208. self._resolve_client()
  209. self._refresh_task = asyncio.create_task(self._refresh_loop())
  210. async def stop(self) -> None:
  211. """Detach from the target printer and stop the refresh loop."""
  212. self._stopping = True
  213. if self._refresh_task is not None:
  214. self._refresh_task.cancel()
  215. try:
  216. await self._refresh_task
  217. except asyncio.CancelledError:
  218. pass
  219. self._refresh_task = None
  220. self._unbind_client()
  221. self._loop = None
  222. async def _refresh_loop(self) -> None:
  223. """Re-resolve the target client periodically — paho clients can be replaced.
  224. BambuMQTTClient is destroyed and recreated on PrinterManager.connect_printer
  225. (e.g. printer config update). Without periodic refresh the bridge would lose
  226. fan-out after such a churn until the VP itself restarts.
  227. On crash exit, the handler must be unbound — otherwise the registered
  228. ``_on_printer_raw`` keeps firing on every real-printer message even
  229. though the bridge is functionally dead (memory leak + behaviour leak
  230. across VP restart).
  231. """
  232. try:
  233. while not self._stopping:
  234. await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
  235. self._resolve_client()
  236. except asyncio.CancelledError:
  237. raise
  238. except Exception:
  239. logger.exception("[%s] MQTT bridge refresh loop crashed", self.vp_name)
  240. # Crash exit — unbind so the orphaned handler stops firing.
  241. # ``stop()`` won't be invoked because the task completes done-not-cancelled.
  242. self._unbind_client()
  243. def _resolve_client(self) -> None:
  244. """Look up the current client for target_printer_id and rebind if it changed."""
  245. try:
  246. current = self._printer_manager.get_client(self.target_printer_id)
  247. except Exception:
  248. logger.exception("[%s] MQTT bridge: get_client failed", self.vp_name)
  249. return
  250. if current is self._target_client:
  251. # Same client object — but `ip_address` can fill in *after* the
  252. # initial bind (e.g. DB row had a stale/empty value until the
  253. # client's first SSDP-driven IP refresh). The original code only
  254. # encoded `_target_ip_uint32_le` on client-identity change, so
  255. # that late-arriving IP was never picked up, the `net.info[*].ip`
  256. # rewrite stayed disabled, and the cache filled with the real
  257. # printer IP — #1429. Refresh the encoding every tick so it
  258. # self-heals once `ip_address` becomes valid.
  259. self._refresh_ip_encoding()
  260. return
  261. # Client identity changed — unregister from the old, register on the new.
  262. self._unbind_client()
  263. if current is None:
  264. return
  265. try:
  266. current.register_raw_message_handler(self._on_printer_raw)
  267. except Exception:
  268. logger.exception("[%s] MQTT bridge: register_raw_message_handler failed", self.vp_name)
  269. return
  270. self._target_client = current
  271. self._target_serial = getattr(current, "serial_number", None)
  272. self._refresh_ip_encoding()
  273. logger.info(
  274. "[%s] MQTT bridge bound to printer %s (serial=%s)",
  275. self.vp_name,
  276. self.target_printer_id,
  277. self._target_serial,
  278. )
  279. # Trigger a fresh get_version + pushall against the printer so the bridge
  280. # cache populates immediately. Bambuddy itself queries these on connect,
  281. # but that fires before the bridge attaches as a raw-message consumer,
  282. # so without this nudge the cache stays empty until the next periodic
  283. # query (which can be minutes away).
  284. request_fn = getattr(current, "_request_version", None)
  285. if callable(request_fn):
  286. try:
  287. request_fn()
  288. except Exception:
  289. logger.exception("[%s] MQTT bridge: _request_version failed", self.vp_name)
  290. request_status_fn = getattr(current, "request_status_update", None)
  291. if callable(request_status_fn):
  292. try:
  293. request_status_fn()
  294. except Exception:
  295. logger.exception("[%s] MQTT bridge: request_status_update failed", self.vp_name)
  296. def _unbind_client(self) -> None:
  297. if self._target_client is None:
  298. return
  299. try:
  300. self._target_client.unregister_raw_message_handler(self._on_printer_raw)
  301. except Exception:
  302. logger.exception("[%s] MQTT bridge: unregister_raw_message_handler failed", self.vp_name)
  303. logger.info("[%s] MQTT bridge unbound from printer %s", self.vp_name, self.target_printer_id)
  304. self._target_client = None
  305. self._target_serial = None
  306. def _refresh_ip_encoding(self) -> None:
  307. """(Re-)encode `_target_ip_uint32_le` / `_vp_ip_uint32_le` from current values.
  308. Called on every refresh tick, not just on client-identity change, so
  309. a late-arriving printer IP (or a bind-address change) is picked up
  310. without restarting the VP. When the encoding becomes valid for the
  311. first time *after* the cache already received a push with the real
  312. printer IP, also sweep the existing cache so the slicer's next pull
  313. sees the rewritten value (#1429). Without this sweep the sticky-key
  314. preservation keeps the poisoned `net.info[].ip` alive forever.
  315. """
  316. client = self._target_client
  317. if client is None:
  318. return
  319. target_ip = getattr(client, "ip_address", None)
  320. vp_ip = getattr(self._mqtt_server, "bind_address", None)
  321. if not target_ip or not vp_ip or vp_ip in ("0.0.0.0", "", None): # nosec B104
  322. return
  323. try:
  324. new_target_le = _ip_to_uint32_le(target_ip)
  325. new_vp_le = _ip_to_uint32_le(vp_ip)
  326. except ValueError:
  327. return
  328. if new_target_le == self._target_ip_uint32_le and new_vp_le == self._vp_ip_uint32_le:
  329. return # No change — nothing to do.
  330. # Encoding either became valid for the first time or shifted (DHCP
  331. # renewal, bind_ip reconfigured, etc.). Update + sweep the cache.
  332. was_armed = self._target_ip_uint32_le is not None and self._vp_ip_uint32_le is not None
  333. self._target_ip_uint32_le = new_target_le
  334. self._vp_ip_uint32_le = new_vp_le
  335. logger.info(
  336. "[%s] MQTT bridge IP encoding %s: target=%s vp=%s",
  337. self.vp_name,
  338. "updated" if was_armed else "armed",
  339. target_ip,
  340. vp_ip,
  341. )
  342. cached = self._latest_print_state
  343. if isinstance(cached, dict):
  344. n = self._rewrite_net_info_ips(cached)
  345. if n:
  346. logger.info(
  347. "[%s] MQTT bridge swept %d net.info[].ip entries in cached push",
  348. self.vp_name,
  349. n,
  350. )
  351. def _rewrite_net_info_ips(self, print_state: dict) -> int:
  352. """Rewrite every non-zero `net.info[].ip` in `print_state` to the VP bind IP.
  353. Returns the number of entries rewritten. Mutates `print_state` in place.
  354. Strategy: rewrite ALL entries with a non-zero `ip`, not only those
  355. matching `_target_ip_uint32_le`. Real printers (X1C, H2D Pro) can
  356. report multiple active interfaces (WiFi + Ethernet) with different
  357. IPs — only one matches the IP Bambuddy tracks, but the slicer may
  358. read any of them. Leaving non-matching entries pointing at real
  359. printer interfaces leaks an FTP fallback path that bypasses the VP
  360. (the #1429 / #1302 symptom). Entries with `ip == 0` are placeholders
  361. for unpopulated interfaces — leave them alone so the slicer's
  362. "active interface" detection still recognises them as absent.
  363. """
  364. if self._vp_ip_uint32_le is None:
  365. return 0
  366. net = print_state.get("net")
  367. if not isinstance(net, dict):
  368. return 0
  369. info = net.get("info")
  370. if not isinstance(info, list):
  371. return 0
  372. rewritten = 0
  373. for entry in info:
  374. if not isinstance(entry, dict):
  375. continue
  376. ip_value = entry.get("ip")
  377. if not isinstance(ip_value, int) or ip_value == 0:
  378. continue
  379. if ip_value == self._vp_ip_uint32_le:
  380. continue
  381. entry["ip"] = self._vp_ip_uint32_le
  382. rewritten += 1
  383. return rewritten
  384. def _on_printer_raw(self, topic: str, payload: bytes) -> None:
  385. """Paho-thread callback — cache the latest push_status for synthetic replay.
  386. Instead of fanning out a second stream of MQTT messages to the slicer
  387. (which trips BambuStudio's Send pre-flight consistency checks), we cache
  388. the latest real printer push_status here. The VP's existing 1 Hz
  389. synthetic push (which is what Send is built around) consults this cache
  390. and replaces its stub fields with real values when available.
  391. """
  392. if self._stopping:
  393. return
  394. target_serial = self._target_serial
  395. if not target_serial:
  396. return
  397. prefix = f"device/{target_serial}/"
  398. if not topic.startswith(prefix):
  399. return
  400. suffix = topic[len(prefix) :]
  401. if not suffix.startswith("report"):
  402. return
  403. try:
  404. data = json.loads(payload)
  405. except json.JSONDecodeError:
  406. return
  407. # Race-free by construction: `json.loads` returns a fresh dict tree per
  408. # call so paho-thread mutations below cannot collide with prior cached
  409. # state held by the asyncio thread. `_send_status_report`'s shallow
  410. # `dict(cached)` is also safe because nothing else writes to the cached
  411. # tree after assignment. The defensive deep-copy on store below removes
  412. # any future risk if a maintainer later re-enters the cached dict to
  413. # mutate it.
  414. # push_status snapshots → cache the print dict for the periodic 1 Hz
  415. # cached-as-base delivery. We do NOT fan these out separately (the
  416. # 1 Hz cached-as-base IS the slicer-facing push_status stream).
  417. print_data = data.get("print")
  418. if isinstance(print_data, dict) and print_data.get("command") == "push_status":
  419. for value in print_data.values():
  420. if isinstance(value, dict) and value.get("sn") == target_serial:
  421. value["sn"] = self.vp_serial
  422. # Note: `ipcam.rtsp_url` carries the real printer's IP. We pass it
  423. # through unchanged — the slicer uses it to fetch the live camera
  424. # stream directly from the printer. On the same LAN this works as
  425. # long as the slicer's stored access code matches the printer's
  426. # (i.e. configure the VP with the same access code as its target).
  427. # Rewrite real printer IP → VP bind IP in `net.info[*].ip` so the
  428. # slicer's FTP destination resolves to the VP, not the real printer.
  429. self._rewrite_net_info_ips(print_data)
  430. # Defensive deep copy on store so the cache is fully decoupled from
  431. # the freshly-parsed tree and from any reader's reference.
  432. new_state = copy.deepcopy(print_data)
  433. # Bambu firmware sends two kinds of push_status: full pushall
  434. # responses (on `pushall` requests / printer reconnect) which
  435. # include AMS, vt_tray, net, etc. — and ~1 Hz incremental
  436. # updates with just the fields that changed (typically temps,
  437. # fan, wifi). Without preserving sticky fields from the previous
  438. # cache, the first incremental push after a pushall would wipe
  439. # AMS info from the bridge cache, and slicers reading the cache
  440. # between pushalls would see a stripped-down printer state with
  441. # no AMS visible until the next pushall — typically only when
  442. # the user power-cycles the printer (#1371). Mirror the same
  443. # preservation pattern Bambuddy uses for its own internal state
  444. # in bambu_mqtt.py (see _SLICER_VISIBLE_STICKY_KEYS below).
  445. prev = self._latest_print_state
  446. if prev is not None:
  447. for sticky_key in _SLICER_VISIBLE_STICKY_KEYS:
  448. if sticky_key not in new_state:
  449. if sticky_key in prev:
  450. # Defensive deep copy — without this the carried-over
  451. # nested dicts/lists are shared between new_state and
  452. # the previous cache, so any in-place mutation later
  453. # (current or future code paths) would corrupt both.
  454. new_state[sticky_key] = copy.deepcopy(prev[sticky_key])
  455. continue
  456. # Key IS in new_state — but firmware sends partial blobs
  457. # (status-only / tray-targeted) under the same key on
  458. # incremental updates, which would overwrite the cached
  459. # full blob and break the slicer's AMS render (#1387).
  460. # For `ams` specifically the deep-merge mirrors what
  461. # Bambuddy already does internally in `_handle_ams_data`.
  462. if (
  463. sticky_key == "ams"
  464. and isinstance(new_state.get("ams"), dict)
  465. and isinstance(prev.get("ams"), dict)
  466. ):
  467. new_state["ams"] = _merge_ams_dict(prev["ams"], new_state["ams"])
  468. self._latest_print_state = new_state
  469. return
  470. # info.get_version responses → cache the module list so the synthetic
  471. # version response can include the real AMS modules.
  472. info_data = data.get("info")
  473. if isinstance(info_data, dict) and info_data.get("command") == "get_version":
  474. modules = info_data.get("module")
  475. if isinstance(modules, list):
  476. rewritten: list = []
  477. for module in modules:
  478. if isinstance(module, dict):
  479. module = dict(module)
  480. if module.get("sn") == target_serial:
  481. module["sn"] = self.vp_serial
  482. rewritten.append(module)
  483. self._latest_version_modules = rewritten
  484. # Don't fan out get_version — the slicer's request (when it issues
  485. # one) is intercepted locally and answered from the cached modules.
  486. return
  487. # Everything else (extrusion_cali_get response, AMS write acks, xcam
  488. # responses, …): fan out to the slicer. These are responses to commands
  489. # the slicer (or Bambuddy) issued; the slicer matches by sequence_id and
  490. # ignores responses to commands it didn't send. Without this, slicer-
  491. # initiated queries like extrusion_cali_get hang forever and BambuStudio
  492. # blocks Send waiting for the response.
  493. loop = self._loop
  494. if loop is None:
  495. return
  496. target_bytes = target_serial.encode("ascii")
  497. if target_bytes in payload:
  498. payload = payload.replace(target_bytes, self.vp_serial.encode("ascii"))
  499. vp_topic = f"device/{self.vp_serial}/{suffix}"
  500. try:
  501. asyncio.run_coroutine_threadsafe(
  502. self._mqtt_server.push_raw_to_clients(vp_topic, payload),
  503. loop,
  504. )
  505. except RuntimeError:
  506. pass
  507. def get_latest_print_state(self) -> dict | None:
  508. """Return the most recent real printer push_status `print` dict, or None."""
  509. return self._latest_print_state
  510. def get_latest_version_modules(self) -> list | None:
  511. """Return the most recent real printer get_version `module` list, or None."""
  512. return self._latest_version_modules
  513. def forward_to_printer(self, payload: dict) -> bool:
  514. """Publish a slicer-originated command to the real printer's request topic.
  515. Returns False if no printer client is currently bound.
  516. """
  517. client = self._target_client
  518. target_serial = self._target_serial
  519. if client is None or target_serial is None:
  520. logger.debug(
  521. "[%s] forward_to_printer dropped (printer %s not bound): %s",
  522. self.vp_name,
  523. self.target_printer_id,
  524. list(payload.keys()),
  525. )
  526. return False
  527. topic = f"device/{target_serial}/request"
  528. try:
  529. return client.publish_raw(topic, json.dumps(payload), qos=1)
  530. except Exception:
  531. logger.exception("[%s] forward_to_printer publish failed", self.vp_name)
  532. return False