mqtt_bridge.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. """MQTT bridge for non-proxy virtual printers.
  2. Mirrors the target printer's state to slicers connected to a virtual printer
  3. without opening a second MQTT session on the printer (reuses Bambuddy's
  4. existing subscription — firmware inflight budget unaffected, see PR #1164).
  5. Architecture (cached-as-base, not a separate fan-out stream):
  6. - **push_status** snapshots from the printer are CACHED here. The VP's
  7. `SimpleMQTTServer._send_status_report` consults that cache and sends
  8. a near-byte-identical copy of the real push to the slicer (with
  9. sequence_id / gcode_state / etc. overridden). Single source of truth
  10. keeps BambuStudio's Send pre-flight happy.
  11. - **info.get_version** responses are also cached so the synthetic version
  12. response can include the real AMS module list (n3f/n3s/ams entries).
  13. Without this BambuStudio's Prepare tab labels every AMS as "unknown".
  14. - **Other command responses** (extrusion_cali_get, AMS write acks,
  15. xcam responses, …) are fanned out raw to the slicer — they carry
  16. sequence_ids the slicer is waiting on; the slicer matches and ignores
  17. unrelated ones.
  18. Identity rewriting at cache time:
  19. - `upgrade_state.sn` (and any other nested dict's `sn` matching the real
  20. serial) → VP serial
  21. - `net.info[*].ip` little-endian uint32 → VP bind IP. BambuStudio reads
  22. this as the FTP destination IP. Without this the slicer FTPs straight
  23. to the real printer and bypasses Bambuddy.
  24. - `ipcam.rtsp_url` is left unchanged: BambuStudio overrides the URL host
  25. with the device IP it bound to (the VP), so the slicer hits the VP's
  26. own RTSPS proxy on port 322.
  27. """
  28. from __future__ import annotations
  29. import asyncio
  30. import copy
  31. import json
  32. import logging
  33. from typing import TYPE_CHECKING
  34. if TYPE_CHECKING:
  35. from backend.app.services.bambu_mqtt import BambuMQTTClient
  36. from backend.app.services.printer_manager import PrinterManager
  37. from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer
  38. logger = logging.getLogger(__name__)
  39. REFRESH_INTERVAL_SECONDS = 30.0
  40. def _ip_to_uint32_le(ip_str: str) -> int:
  41. """Encode dotted-quad IPv4 as little-endian uint32 (Bambu MQTT's `net.info[].ip` shape)."""
  42. parts = [int(x) for x in ip_str.split(".")]
  43. if len(parts) != 4 or any(p < 0 or p > 255 for p in parts):
  44. raise ValueError(f"invalid IPv4: {ip_str!r}")
  45. return parts[0] | (parts[1] << 8) | (parts[2] << 16) | (parts[3] << 24)
  46. class MQTTBridge:
  47. """Per-VP MQTT fan-out between a real printer and slicers connected to a VP."""
  48. def __init__(
  49. self,
  50. *,
  51. vp_id: int,
  52. vp_name: str,
  53. vp_serial: str,
  54. target_printer_id: int,
  55. mqtt_server: SimpleMQTTServer,
  56. printer_manager: PrinterManager,
  57. ):
  58. self.vp_id = vp_id
  59. self.vp_name = vp_name
  60. self.vp_serial = vp_serial
  61. self.target_printer_id = target_printer_id
  62. self._mqtt_server = mqtt_server
  63. self._printer_manager = printer_manager
  64. self._target_client: BambuMQTTClient | None = None
  65. self._target_serial: str | None = None
  66. self._target_ip_uint32_le: int | None = None
  67. self._vp_ip_uint32_le: int | None = None
  68. self._loop: asyncio.AbstractEventLoop | None = None
  69. self._refresh_task: asyncio.Task | None = None
  70. self._stopping = False
  71. self._latest_print_state: dict | None = None
  72. self._latest_version_modules: list | None = None
  73. @property
  74. def is_active(self) -> bool:
  75. """True iff a target client is bound and currently connected."""
  76. client = self._target_client
  77. return bool(client is not None and getattr(client, "state", None) and client.state.connected)
  78. async def start(self) -> None:
  79. """Bind to the target printer (if connected) and start the refresh loop."""
  80. self._loop = asyncio.get_running_loop()
  81. self._stopping = False
  82. self._resolve_client()
  83. self._refresh_task = asyncio.create_task(self._refresh_loop())
  84. async def stop(self) -> None:
  85. """Detach from the target printer and stop the refresh loop."""
  86. self._stopping = True
  87. if self._refresh_task is not None:
  88. self._refresh_task.cancel()
  89. try:
  90. await self._refresh_task
  91. except asyncio.CancelledError:
  92. pass
  93. self._refresh_task = None
  94. self._unbind_client()
  95. self._loop = None
  96. async def _refresh_loop(self) -> None:
  97. """Re-resolve the target client periodically — paho clients can be replaced.
  98. BambuMQTTClient is destroyed and recreated on PrinterManager.connect_printer
  99. (e.g. printer config update). Without periodic refresh the bridge would lose
  100. fan-out after such a churn until the VP itself restarts.
  101. """
  102. try:
  103. while not self._stopping:
  104. await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
  105. self._resolve_client()
  106. except asyncio.CancelledError:
  107. raise
  108. except Exception:
  109. logger.exception("[%s] MQTT bridge refresh loop crashed", self.vp_name)
  110. def _resolve_client(self) -> None:
  111. """Look up the current client for target_printer_id and rebind if it changed."""
  112. try:
  113. current = self._printer_manager.get_client(self.target_printer_id)
  114. except Exception:
  115. logger.exception("[%s] MQTT bridge: get_client failed", self.vp_name)
  116. return
  117. if current is self._target_client:
  118. return
  119. # Client identity changed — unregister from the old, register on the new.
  120. self._unbind_client()
  121. if current is None:
  122. return
  123. try:
  124. current.register_raw_message_handler(self._on_printer_raw)
  125. except Exception:
  126. logger.exception("[%s] MQTT bridge: register_raw_message_handler failed", self.vp_name)
  127. return
  128. self._target_client = current
  129. self._target_serial = getattr(current, "serial_number", None)
  130. # Cache printer IP and VP bind IP encoded as little-endian uint32, so we
  131. # can rewrite `net.info[*].ip` in cached push_status. BambuStudio reads
  132. # that field for the FTP destination IP — without rewriting, the slicer
  133. # bypasses the VP and FTPs straight to the real printer.
  134. target_ip = getattr(current, "ip_address", None)
  135. vp_ip = getattr(self._mqtt_server, "bind_address", None)
  136. if target_ip and vp_ip and vp_ip not in ("0.0.0.0", "", None): # nosec B104
  137. try:
  138. self._target_ip_uint32_le = _ip_to_uint32_le(target_ip)
  139. self._vp_ip_uint32_le = _ip_to_uint32_le(vp_ip)
  140. except ValueError:
  141. self._target_ip_uint32_le = None
  142. self._vp_ip_uint32_le = None
  143. logger.info(
  144. "[%s] MQTT bridge bound to printer %s (serial=%s)",
  145. self.vp_name,
  146. self.target_printer_id,
  147. self._target_serial,
  148. )
  149. # Trigger a fresh get_version + pushall against the printer so the bridge
  150. # cache populates immediately. Bambuddy itself queries these on connect,
  151. # but that fires before the bridge attaches as a raw-message consumer,
  152. # so without this nudge the cache stays empty until the next periodic
  153. # query (which can be minutes away).
  154. request_fn = getattr(current, "_request_version", None)
  155. if callable(request_fn):
  156. try:
  157. request_fn()
  158. except Exception:
  159. logger.exception("[%s] MQTT bridge: _request_version failed", self.vp_name)
  160. request_status_fn = getattr(current, "request_status_update", None)
  161. if callable(request_status_fn):
  162. try:
  163. request_status_fn()
  164. except Exception:
  165. logger.exception("[%s] MQTT bridge: request_status_update failed", self.vp_name)
  166. def _unbind_client(self) -> None:
  167. if self._target_client is None:
  168. return
  169. try:
  170. self._target_client.unregister_raw_message_handler(self._on_printer_raw)
  171. except Exception:
  172. logger.exception("[%s] MQTT bridge: unregister_raw_message_handler failed", self.vp_name)
  173. logger.info("[%s] MQTT bridge unbound from printer %s", self.vp_name, self.target_printer_id)
  174. self._target_client = None
  175. self._target_serial = None
  176. def _on_printer_raw(self, topic: str, payload: bytes) -> None:
  177. """Paho-thread callback — cache the latest push_status for synthetic replay.
  178. Instead of fanning out a second stream of MQTT messages to the slicer
  179. (which trips BambuStudio's Send pre-flight consistency checks), we cache
  180. the latest real printer push_status here. The VP's existing 1 Hz
  181. synthetic push (which is what Send is built around) consults this cache
  182. and replaces its stub fields with real values when available.
  183. """
  184. if self._stopping:
  185. return
  186. target_serial = self._target_serial
  187. if not target_serial:
  188. return
  189. prefix = f"device/{target_serial}/"
  190. if not topic.startswith(prefix):
  191. return
  192. suffix = topic[len(prefix) :]
  193. if not suffix.startswith("report"):
  194. return
  195. try:
  196. data = json.loads(payload)
  197. except json.JSONDecodeError:
  198. return
  199. # Race-free by construction: `json.loads` returns a fresh dict tree per
  200. # call so paho-thread mutations below cannot collide with prior cached
  201. # state held by the asyncio thread. `_send_status_report`'s shallow
  202. # `dict(cached)` is also safe because nothing else writes to the cached
  203. # tree after assignment. The defensive deep-copy on store below removes
  204. # any future risk if a maintainer later re-enters the cached dict to
  205. # mutate it.
  206. # push_status snapshots → cache the print dict for the periodic 1 Hz
  207. # cached-as-base delivery. We do NOT fan these out separately (the
  208. # 1 Hz cached-as-base IS the slicer-facing push_status stream).
  209. print_data = data.get("print")
  210. if isinstance(print_data, dict) and print_data.get("command") == "push_status":
  211. for value in print_data.values():
  212. if isinstance(value, dict) and value.get("sn") == target_serial:
  213. value["sn"] = self.vp_serial
  214. # Note: `ipcam.rtsp_url` carries the real printer's IP. We pass it
  215. # through unchanged — the slicer uses it to fetch the live camera
  216. # stream directly from the printer. On the same LAN this works as
  217. # long as the slicer's stored access code matches the printer's
  218. # (i.e. configure the VP with the same access code as its target).
  219. # Rewrite real printer IP → VP bind IP in `net.info[*].ip` so the
  220. # slicer's FTP destination resolves to the VP, not the real printer.
  221. if self._target_ip_uint32_le is not None and self._vp_ip_uint32_le is not None:
  222. net = print_data.get("net")
  223. if isinstance(net, dict):
  224. info = net.get("info")
  225. if isinstance(info, list):
  226. for entry in info:
  227. if isinstance(entry, dict) and entry.get("ip") == self._target_ip_uint32_le:
  228. entry["ip"] = self._vp_ip_uint32_le
  229. # Defensive deep copy on store so the cache is fully decoupled from
  230. # the freshly-parsed tree and from any reader's reference.
  231. self._latest_print_state = copy.deepcopy(print_data)
  232. return
  233. # info.get_version responses → cache the module list so the synthetic
  234. # version response can include the real AMS modules.
  235. info_data = data.get("info")
  236. if isinstance(info_data, dict) and info_data.get("command") == "get_version":
  237. modules = info_data.get("module")
  238. if isinstance(modules, list):
  239. rewritten: list = []
  240. for module in modules:
  241. if isinstance(module, dict):
  242. module = dict(module)
  243. if module.get("sn") == target_serial:
  244. module["sn"] = self.vp_serial
  245. rewritten.append(module)
  246. self._latest_version_modules = rewritten
  247. # Don't fan out get_version — the slicer's request (when it issues
  248. # one) is intercepted locally and answered from the cached modules.
  249. return
  250. # Everything else (extrusion_cali_get response, AMS write acks, xcam
  251. # responses, …): fan out to the slicer. These are responses to commands
  252. # the slicer (or Bambuddy) issued; the slicer matches by sequence_id and
  253. # ignores responses to commands it didn't send. Without this, slicer-
  254. # initiated queries like extrusion_cali_get hang forever and BambuStudio
  255. # blocks Send waiting for the response.
  256. loop = self._loop
  257. if loop is None:
  258. return
  259. target_bytes = target_serial.encode("ascii")
  260. if target_bytes in payload:
  261. payload = payload.replace(target_bytes, self.vp_serial.encode("ascii"))
  262. vp_topic = f"device/{self.vp_serial}/{suffix}"
  263. try:
  264. asyncio.run_coroutine_threadsafe(
  265. self._mqtt_server.push_raw_to_clients(vp_topic, payload),
  266. loop,
  267. )
  268. except RuntimeError:
  269. pass
  270. def get_latest_print_state(self) -> dict | None:
  271. """Return the most recent real printer push_status `print` dict, or None."""
  272. return self._latest_print_state
  273. def get_latest_version_modules(self) -> list | None:
  274. """Return the most recent real printer get_version `module` list, or None."""
  275. return self._latest_version_modules
  276. def forward_to_printer(self, payload: dict) -> bool:
  277. """Publish a slicer-originated command to the real printer's request topic.
  278. Returns False if no printer client is currently bound.
  279. """
  280. client = self._target_client
  281. target_serial = self._target_serial
  282. if client is None or target_serial is None:
  283. logger.debug(
  284. "[%s] forward_to_printer dropped (printer %s not bound): %s",
  285. self.vp_name,
  286. self.target_printer_id,
  287. list(payload.keys()),
  288. )
  289. return False
  290. topic = f"device/{target_serial}/request"
  291. try:
  292. return client.publish_raw(topic, json.dumps(payload), qos=1)
  293. except Exception:
  294. logger.exception("[%s] forward_to_printer publish failed", self.vp_name)
  295. return False