tcp_proxy.py 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776
  1. """Proxy for slicer-to-printer communication.
  2. This module provides both transparent TCP proxying and TLS-terminating
  3. proxying for forwarding data between a slicer and a real Bambu printer,
  4. enabling remote printing over any network connection.
  5. Most protocols (FTP, FileTransfer, Camera) use transparent TCP proxying —
  6. raw bytes are forwarded without decryption, preserving end-to-end TLS
  7. between slicer and printer. Only MQTT is TLS-terminated so Bambuddy can
  8. rewrite the printer's real IP with the proxy's bind IP in MQTT payloads.
  9. """
  10. # ruff: noqa: N801
  11. import asyncio
  12. import logging
  13. import random
  14. import re
  15. import ssl
  16. import subprocess
  17. from collections.abc import Callable
  18. from pathlib import Path
  19. logger = logging.getLogger(__name__)
  20. class _SessionReuseSSLContext:
  21. """Proxy around SSLContext that injects a TLS session into wrap_bio().
  22. vsFTPd (used by some Bambu printers like X1C) requires TLS session reuse
  23. on FTP data channels — the data connection must reuse the TLS session from
  24. the control channel. Without this, the printer rejects the data connection
  25. with "522 SSL connection failed: session reuse required".
  26. asyncio's open_connection() calls SSLContext.wrap_bio() internally but
  27. doesn't expose a session parameter. This wrapper intercepts wrap_bio()
  28. to inject the saved control-channel session, enabling session reuse.
  29. """
  30. def __init__(self, ctx: ssl.SSLContext, session: ssl.SSLSession) -> None:
  31. object.__setattr__(self, "_ctx", ctx)
  32. object.__setattr__(self, "_session", session)
  33. def __getattr__(self, name: str) -> object:
  34. return getattr(self._ctx, name)
  35. def wrap_bio(
  36. self,
  37. incoming: ssl.MemoryBIO,
  38. outgoing: ssl.MemoryBIO,
  39. server_side: bool = False,
  40. server_hostname: str | None = None,
  41. **kwargs: object,
  42. ) -> ssl.SSLObject:
  43. return self._ctx.wrap_bio(
  44. incoming,
  45. outgoing,
  46. server_side=server_side,
  47. server_hostname=server_hostname,
  48. session=self._session,
  49. **kwargs,
  50. )
  51. def detect_port_redirect(port: int) -> int | None:
  52. """Detect if iptables redirects a port to another port.
  53. When iptables NAT REDIRECT rules exist (e.g. port redirects), connections
  54. to the original port never reach our socket because iptables intercepts
  55. them in PREROUTING. We must listen on the redirect target instead.
  56. Returns the redirect target port, or None if no redirect is active.
  57. """
  58. # Method 1: Read persistent rules file (doesn't require root)
  59. for rules_path in ("/etc/iptables/rules.v4", "/etc/iptables.rules"):
  60. try:
  61. with open(rules_path) as f:
  62. content = f.read()
  63. match = re.search(rf"--dport {port}\b.*?--to-ports\s+(\d+)", content)
  64. if match:
  65. target = int(match.group(1))
  66. if target != port:
  67. return target
  68. except (FileNotFoundError, PermissionError, OSError):
  69. continue
  70. # Method 2: Query live iptables rules (may require root)
  71. try:
  72. result = subprocess.run( # noqa: S603, S607
  73. ["iptables-save", "-t", "nat"],
  74. capture_output=True,
  75. text=True,
  76. timeout=5,
  77. )
  78. if result.returncode == 0:
  79. match = re.search(rf"--dport {port}\b.*?--to-ports\s+(\d+)", result.stdout)
  80. if match:
  81. target = int(match.group(1))
  82. if target != port:
  83. return target
  84. except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
  85. pass
  86. return None
  87. class TLSProxy:
  88. """TLS terminating proxy that forwards data between client and target.
  89. This proxy terminates TLS on both ends, allowing the slicer to connect
  90. to Bambuddy's certificate while Bambuddy connects to the real printer.
  91. """
  92. def __init__(
  93. self,
  94. name: str,
  95. listen_port: int,
  96. target_host: str,
  97. target_port: int,
  98. server_cert_path: Path,
  99. server_key_path: Path,
  100. on_connect: Callable[[str], None] | None = None,
  101. on_disconnect: Callable[[str], None] | None = None,
  102. bind_address: str = "0.0.0.0", # nosec B104
  103. rewrite_ip: tuple[str, str] | None = None,
  104. ):
  105. """Initialize the TLS proxy.
  106. Args:
  107. name: Friendly name for logging (e.g., "FTP", "MQTT")
  108. listen_port: Port to listen on for incoming connections
  109. target_host: Target printer IP/hostname
  110. target_port: Target printer port
  111. server_cert_path: Path to server certificate (for accepting slicer connections)
  112. server_key_path: Path to server private key
  113. on_connect: Optional callback when client connects (receives client_id)
  114. on_disconnect: Optional callback when client disconnects (receives client_id)
  115. bind_address: IP address to bind to (default: all interfaces)
  116. rewrite_ip: Optional (old_ip, new_ip) tuple — replaces occurrences of
  117. the printer's real IP with the proxy's bind IP in printer→client data.
  118. This prevents the slicer from discovering the printer's real IP
  119. in MQTT payloads (ip_addr, rtsp_url, etc.) and bypassing the proxy.
  120. """
  121. self.name = name
  122. self.listen_port = listen_port
  123. self.target_host = target_host
  124. self.target_port = target_port
  125. self.server_cert_path = server_cert_path
  126. self.server_key_path = server_key_path
  127. self.on_connect = on_connect
  128. self.on_disconnect = on_disconnect
  129. self.bind_address = bind_address
  130. # IP rewriting for printer→client direction
  131. if rewrite_ip:
  132. self._rewrite_old = rewrite_ip[0].encode("utf-8")
  133. self._rewrite_new = rewrite_ip[1].encode("utf-8")
  134. # Also rewrite the integer IP in net.info[].ip fields.
  135. # Bambu printers encode their IP as a little-endian uint32 integer
  136. # in the JSON payload. BambuStudio reads this to set dev_ip.
  137. self._rewrite_old_int = self._ip_to_le_int_bytes(rewrite_ip[0])
  138. self._rewrite_new_int = self._ip_to_le_int_bytes(rewrite_ip[1])
  139. else:
  140. self._rewrite_old = None
  141. self._rewrite_new = None
  142. self._rewrite_old_int = None
  143. self._rewrite_new_int = None
  144. self._server: asyncio.Server | None = None
  145. self._running = False
  146. self._active_connections: dict[str, tuple[asyncio.Task, asyncio.Task]] = {}
  147. self._server_ssl_context: ssl.SSLContext | None = None
  148. self._client_ssl_context: ssl.SSLContext | None = None
  149. @staticmethod
  150. def _ip_to_le_int_bytes(ip: str) -> bytes:
  151. """Convert an IP address to its little-endian integer JSON representation.
  152. E.g. "192.168.255.16" → b"285190336" (the integer as a decimal string,
  153. as it appears in Bambu MQTT JSON payloads in the net.info[].ip field).
  154. """
  155. import struct as _struct
  156. parts = ip.split(".")
  157. packed = bytes(int(p) for p in parts)
  158. le_int = _struct.unpack("<I", packed)[0]
  159. return str(le_int).encode("utf-8")
  160. def _create_server_ssl_context(self) -> ssl.SSLContext:
  161. """Create SSL context for accepting client (slicer) connections."""
  162. ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  163. ctx.load_cert_chain(self.server_cert_path, self.server_key_path)
  164. # Allow older TLS versions for compatibility with slicers
  165. ctx.minimum_version = ssl.TLSVersion.TLSv1_2
  166. # Don't require client certificates
  167. ctx.verify_mode = ssl.CERT_NONE
  168. return ctx
  169. def _create_client_ssl_context(self) -> ssl.SSLContext:
  170. """Create SSL context for connecting to printer."""
  171. ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  172. # Don't verify printer's certificate (self-signed)
  173. ctx.check_hostname = False
  174. ctx.verify_mode = ssl.CERT_NONE
  175. ctx.minimum_version = ssl.TLSVersion.TLSv1_2
  176. # Bambu printers use plain RSA key exchange (no ECDHE/DHE),
  177. # which modern OpenSSL 3.x defaults exclude. Add them back.
  178. ctx.set_ciphers("DEFAULT:AES256-GCM-SHA384:AES128-GCM-SHA256")
  179. return ctx
  180. async def start(self) -> None:
  181. """Start the TLS proxy server."""
  182. if self._running:
  183. return
  184. logger.info(
  185. f"Starting {self.name} TLS proxy: {self.bind_address}:{self.listen_port} → {self.target_host}:{self.target_port}"
  186. )
  187. try:
  188. self._running = True
  189. # Create SSL contexts
  190. self._server_ssl_context = self._create_server_ssl_context()
  191. self._client_ssl_context = self._create_client_ssl_context()
  192. # Start server with TLS
  193. self._server = await asyncio.start_server(
  194. self._handle_client,
  195. self.bind_address,
  196. self.listen_port,
  197. ssl=self._server_ssl_context,
  198. )
  199. logger.info("%s TLS proxy listening on port %s", self.name, self.listen_port)
  200. async with self._server:
  201. await self._server.serve_forever()
  202. except OSError as e:
  203. if e.errno == 98: # Address already in use
  204. logger.error("%s proxy port %s is already in use", self.name, self.listen_port)
  205. elif e.errno == 13: # Permission denied
  206. logger.error(
  207. "%s proxy: cannot bind to port %s (permission denied). "
  208. "Port %s requires root or CAP_NET_BIND_SERVICE. "
  209. "Docker: add 'cap_add: [NET_BIND_SERVICE]' to docker-compose.yml. "
  210. "Native: use 'sudo setcap cap_net_bind_service=+ep $(which python3)' "
  211. "or redirect with iptables.",
  212. self.name,
  213. self.listen_port,
  214. self.listen_port,
  215. )
  216. else:
  217. logger.error("%s proxy error: %s", self.name, e)
  218. except asyncio.CancelledError:
  219. logger.debug("%s proxy task cancelled", self.name)
  220. except Exception as e:
  221. logger.error("%s proxy error: %s", self.name, e)
  222. finally:
  223. await self.stop()
  224. async def stop(self) -> None:
  225. """Stop the TLS proxy server."""
  226. logger.info("Stopping %s proxy", self.name)
  227. self._running = False
  228. # Cancel all active connection tasks
  229. for client_id, (task1, task2) in list(self._active_connections.items()):
  230. task1.cancel()
  231. task2.cancel()
  232. if self.on_disconnect:
  233. try:
  234. self.on_disconnect(client_id)
  235. except Exception:
  236. pass # Ignore disconnect callback errors during shutdown
  237. self._active_connections.clear()
  238. if self._server:
  239. try:
  240. self._server.close()
  241. await self._server.wait_closed()
  242. except OSError as e:
  243. logger.debug("Error closing %s proxy server: %s", self.name, e)
  244. self._server = None
  245. async def _handle_client(
  246. self,
  247. client_reader: asyncio.StreamReader,
  248. client_writer: asyncio.StreamWriter,
  249. ) -> None:
  250. """Handle a new client connection by proxying to target."""
  251. peername = client_writer.get_extra_info("peername")
  252. client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
  253. logger.info("%s proxy: client connected from %s", self.name, client_id)
  254. if self.on_connect:
  255. try:
  256. self.on_connect(client_id)
  257. except Exception:
  258. pass # Ignore connect callback errors; connection proceeds regardless
  259. # Connect to target printer with TLS
  260. try:
  261. printer_reader, printer_writer = await asyncio.wait_for(
  262. asyncio.open_connection(
  263. self.target_host,
  264. self.target_port,
  265. ssl=self._client_ssl_context,
  266. ),
  267. timeout=10.0,
  268. )
  269. logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
  270. except TimeoutError:
  271. logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
  272. client_writer.close()
  273. await client_writer.wait_closed()
  274. return
  275. except ssl.SSLError as e:
  276. logger.error(
  277. "%s proxy: SSL error connecting to %s:%s: %s", self.name, self.target_host, self.target_port, e
  278. )
  279. client_writer.close()
  280. await client_writer.wait_closed()
  281. return
  282. except OSError as e:
  283. logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
  284. client_writer.close()
  285. await client_writer.wait_closed()
  286. return
  287. # Create bidirectional forwarding tasks
  288. client_to_printer = asyncio.create_task(
  289. self._forward(client_reader, printer_writer, f"{client_id}→printer"),
  290. name=f"{self.name}_c2p_{client_id}",
  291. )
  292. printer_to_client = asyncio.create_task(
  293. self._forward(printer_reader, client_writer, f"printer→{client_id}", rewrite_ip=True),
  294. name=f"{self.name}_p2c_{client_id}",
  295. )
  296. self._active_connections[client_id] = (client_to_printer, printer_to_client)
  297. try:
  298. # Wait for either direction to complete (connection closed)
  299. done, pending = await asyncio.wait(
  300. [client_to_printer, printer_to_client],
  301. return_when=asyncio.FIRST_COMPLETED,
  302. )
  303. # Cancel the other direction
  304. for task in pending:
  305. task.cancel()
  306. try:
  307. await task
  308. except asyncio.CancelledError:
  309. pass # Expected when cancelling the other forwarding direction
  310. except Exception as e:
  311. logger.debug("%s proxy connection error: %s", self.name, e)
  312. finally:
  313. # Clean up
  314. self._active_connections.pop(client_id, None)
  315. for writer in [client_writer, printer_writer]:
  316. try:
  317. writer.close()
  318. await writer.wait_closed()
  319. except OSError:
  320. pass # Best-effort connection cleanup; peer may have disconnected
  321. logger.info("%s proxy: client %s disconnected", self.name, client_id)
  322. if self.on_disconnect:
  323. try:
  324. self.on_disconnect(client_id)
  325. except Exception:
  326. pass # Ignore disconnect callback errors; cleanup continues
  327. @staticmethod
  328. def _rewrite_mqtt_ip(
  329. data: bytes,
  330. old_ip: bytes,
  331. new_ip: bytes,
  332. buffer: bytearray,
  333. extra_replacements: list[tuple[bytes, bytes]] | None = None,
  334. ) -> tuple[bytes, bytearray]:
  335. """Rewrite IP addresses inside MQTT packets, preserving packet framing.
  336. MQTT packets have a variable-length header encoding the remaining
  337. packet length. A naive bytes.replace() would corrupt this framing
  338. when old_ip and new_ip differ in length.
  339. This method parses individual MQTT packets out of the data stream,
  340. performs the replacement only on PUBLISH payloads, and re-encodes
  341. the remaining-length field to match the new size.
  342. Incomplete packets are buffered and returned for the next call.
  343. Args:
  344. extra_replacements: Additional (old, new) byte pairs to replace
  345. (e.g. the integer IP representation in net.info[].ip).
  346. Returns (output_data, remaining_buffer).
  347. """
  348. buffer.extend(data)
  349. # Check if any replacement target exists in the buffer
  350. has_target = old_ip in buffer
  351. if not has_target and extra_replacements:
  352. has_target = any(old in buffer for old, _new in extra_replacements)
  353. if not has_target:
  354. # Fast path: no IP in buffer, but we still need to check for
  355. # incomplete packets at the end that might contain a partial IP.
  356. # For safety, try to parse and emit only complete packets.
  357. result = bytearray()
  358. pos = 0
  359. length = len(buffer)
  360. while pos < length:
  361. packet_start = pos
  362. if pos + 1 >= length:
  363. break
  364. pos += 1 # header byte
  365. # Parse remaining length
  366. remaining_length = 0
  367. multiplier = 1
  368. length_bytes = 0
  369. while pos < length:
  370. encoded_byte = buffer[pos]
  371. pos += 1
  372. remaining_length += (encoded_byte & 0x7F) * multiplier
  373. multiplier *= 128
  374. length_bytes += 1
  375. if (encoded_byte & 0x80) == 0:
  376. break
  377. if length_bytes >= 4:
  378. break
  379. if pos + remaining_length > length:
  380. # Incomplete — keep in buffer
  381. new_buffer = bytearray(buffer[packet_start:])
  382. return bytes(result), new_buffer
  383. pos += remaining_length
  384. result.extend(buffer[packet_start:pos])
  385. # All complete
  386. buffer.clear()
  387. return bytes(result) if result else bytes(data), buffer
  388. # Buffer contains old_ip — parse packets and rewrite
  389. result = bytearray()
  390. pos = 0
  391. length = len(buffer)
  392. while pos < length:
  393. packet_start = pos
  394. if pos >= length:
  395. break
  396. header_byte = buffer[pos]
  397. pos += 1
  398. # Remaining length: variable-length encoding (1-4 bytes)
  399. remaining_length = 0
  400. multiplier = 1
  401. length_bytes = 0
  402. while pos < length:
  403. encoded_byte = buffer[pos]
  404. pos += 1
  405. remaining_length += (encoded_byte & 0x7F) * multiplier
  406. multiplier *= 128
  407. length_bytes += 1
  408. if (encoded_byte & 0x80) == 0:
  409. break
  410. if length_bytes >= 4:
  411. break
  412. # Check if we have enough data for the full packet
  413. if pos + remaining_length > length:
  414. # Incomplete packet — keep in buffer for next call
  415. new_buffer = bytearray(buffer[packet_start:])
  416. return bytes(result), new_buffer
  417. packet_type = (header_byte >> 4) & 0x0F
  418. packet_body = buffer[pos : pos + remaining_length]
  419. pos += remaining_length
  420. # Only rewrite PUBLISH packets (type 3)
  421. needs_rewrite = packet_type == 3 and (
  422. old_ip in packet_body
  423. or (extra_replacements and any(old in packet_body for old, _new in extra_replacements))
  424. )
  425. if needs_rewrite:
  426. new_body = bytes(packet_body).replace(old_ip, new_ip)
  427. if extra_replacements:
  428. for old_val, new_val in extra_replacements:
  429. new_body = new_body.replace(old_val, new_val)
  430. # Re-encode: header byte + new remaining length + new body
  431. result.append(header_byte)
  432. # Encode remaining length (MQTT variable-length encoding)
  433. new_remaining = len(new_body)
  434. while True:
  435. encoded_byte = new_remaining % 128
  436. new_remaining //= 128
  437. if new_remaining > 0:
  438. encoded_byte |= 0x80
  439. result.append(encoded_byte)
  440. if new_remaining == 0:
  441. break
  442. result.extend(new_body)
  443. else:
  444. # Pass through unchanged
  445. result.extend(buffer[packet_start:pos])
  446. buffer.clear()
  447. return bytes(result), buffer
  448. async def _forward(
  449. self,
  450. reader: asyncio.StreamReader,
  451. writer: asyncio.StreamWriter,
  452. direction: str,
  453. rewrite_ip: bool = False,
  454. ) -> None:
  455. """Forward data from reader to writer.
  456. Args:
  457. reader: Source stream (already TLS-decrypted)
  458. writer: Destination stream (will be TLS-encrypted by the stream)
  459. direction: Description for logging (e.g., "client→printer")
  460. rewrite_ip: If True and rewrite_ip was configured, replace the
  461. printer's real IP with the proxy's bind IP in the data.
  462. """
  463. do_rewrite = rewrite_ip and self._rewrite_old is not None
  464. rewrite_buffer = bytearray() if do_rewrite else None
  465. rewrite_logged = False
  466. total_bytes = 0
  467. try:
  468. while self._running:
  469. # Read chunk - use reasonable buffer size
  470. data = await reader.read(65536)
  471. if not data:
  472. # Connection closed
  473. break
  474. # Rewrite printer IP → proxy IP in MQTT PUBLISH payloads
  475. # to prevent the slicer from bypassing the proxy.
  476. if do_rewrite:
  477. extra = [(self._rewrite_old_int, self._rewrite_new_int)] if self._rewrite_old_int else None
  478. data, rewrite_buffer = self._rewrite_mqtt_ip(
  479. data,
  480. self._rewrite_old,
  481. self._rewrite_new,
  482. rewrite_buffer,
  483. extra_replacements=extra,
  484. )
  485. if not rewrite_logged and data:
  486. if self._rewrite_old in data:
  487. logger.warning(
  488. "%s proxy IP rewrite FAILED — %s still present after rewrite!",
  489. self.name,
  490. self._rewrite_old.decode(),
  491. )
  492. else:
  493. logger.info(
  494. "%s proxy IP rewrite active: %s → %s",
  495. self.name,
  496. self._rewrite_old.decode(),
  497. self._rewrite_new.decode(),
  498. )
  499. rewrite_logged = True
  500. if not data:
  501. continue # All data buffered, waiting for more
  502. # Forward to destination
  503. writer.write(data)
  504. await writer.drain()
  505. total_bytes += len(data)
  506. except asyncio.CancelledError:
  507. pass # Expected when the other forwarding direction closes first
  508. except ConnectionResetError:
  509. logger.debug("%s proxy %s: connection reset", self.name, direction)
  510. except BrokenPipeError:
  511. logger.debug("%s proxy %s: broken pipe", self.name, direction)
  512. except OSError as e:
  513. logger.debug("%s proxy %s error: %s", self.name, direction, e)
  514. logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
  515. class TCPProxy:
  516. """Raw TCP proxy that forwards data without TLS termination.
  517. Used for protocols where the printer doesn't use TLS (e.g., port 3000
  518. binding/authentication protocol).
  519. """
  520. def __init__(
  521. self,
  522. name: str,
  523. listen_port: int,
  524. target_host: str,
  525. target_port: int,
  526. on_connect: Callable[[str], None] | None = None,
  527. on_disconnect: Callable[[str], None] | None = None,
  528. bind_address: str = "0.0.0.0", # nosec B104
  529. ):
  530. self.name = name
  531. self.listen_port = listen_port
  532. self.target_host = target_host
  533. self.target_port = target_port
  534. self.on_connect = on_connect
  535. self.on_disconnect = on_disconnect
  536. self.bind_address = bind_address
  537. self._server: asyncio.Server | None = None
  538. self._running = False
  539. self._active_connections: dict[str, tuple[asyncio.Task, asyncio.Task]] = {}
  540. async def start(self) -> None:
  541. """Start the TCP proxy server."""
  542. if self._running:
  543. return
  544. logger.info(
  545. "Starting %s TCP proxy: %s:%s → %s:%s",
  546. self.name,
  547. self.bind_address,
  548. self.listen_port,
  549. self.target_host,
  550. self.target_port,
  551. )
  552. try:
  553. self._running = True
  554. self._server = await asyncio.start_server(
  555. self._handle_client,
  556. self.bind_address,
  557. self.listen_port,
  558. )
  559. logger.info("%s TCP proxy listening on port %s", self.name, self.listen_port)
  560. async with self._server:
  561. await self._server.serve_forever()
  562. except OSError as e:
  563. if e.errno == 98: # Address already in use
  564. logger.error("%s proxy port %s is already in use", self.name, self.listen_port)
  565. else:
  566. logger.error("%s proxy error: %s", self.name, e)
  567. except asyncio.CancelledError:
  568. logger.debug("%s proxy task cancelled", self.name)
  569. except Exception as e:
  570. logger.error("%s proxy error: %s", self.name, e)
  571. finally:
  572. await self.stop()
  573. async def stop(self) -> None:
  574. """Stop the TCP proxy server."""
  575. logger.info("Stopping %s proxy", self.name)
  576. self._running = False
  577. for client_id, (task1, task2) in list(self._active_connections.items()):
  578. task1.cancel()
  579. task2.cancel()
  580. if self.on_disconnect:
  581. try:
  582. self.on_disconnect(client_id)
  583. except Exception:
  584. pass
  585. self._active_connections.clear()
  586. if self._server:
  587. try:
  588. self._server.close()
  589. await self._server.wait_closed()
  590. except OSError as e:
  591. logger.debug("Error closing %s proxy server: %s", self.name, e)
  592. self._server = None
  593. async def _handle_client(
  594. self,
  595. client_reader: asyncio.StreamReader,
  596. client_writer: asyncio.StreamWriter,
  597. ) -> None:
  598. """Handle a new client connection by proxying to target."""
  599. peername = client_writer.get_extra_info("peername")
  600. client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
  601. logger.info("%s proxy: client connected from %s", self.name, client_id)
  602. if self.on_connect:
  603. try:
  604. self.on_connect(client_id)
  605. except Exception:
  606. pass
  607. try:
  608. printer_reader, printer_writer = await asyncio.wait_for(
  609. asyncio.open_connection(self.target_host, self.target_port),
  610. timeout=10.0,
  611. )
  612. logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
  613. except TimeoutError:
  614. logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
  615. client_writer.close()
  616. await client_writer.wait_closed()
  617. return
  618. except OSError as e:
  619. logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
  620. client_writer.close()
  621. await client_writer.wait_closed()
  622. return
  623. client_to_printer = asyncio.create_task(
  624. self._forward(client_reader, printer_writer, f"{client_id}→printer"),
  625. name=f"{self.name}_c2p_{client_id}",
  626. )
  627. printer_to_client = asyncio.create_task(
  628. self._forward(printer_reader, client_writer, f"printer→{client_id}"),
  629. name=f"{self.name}_p2c_{client_id}",
  630. )
  631. self._active_connections[client_id] = (client_to_printer, printer_to_client)
  632. try:
  633. done, pending = await asyncio.wait(
  634. [client_to_printer, printer_to_client],
  635. return_when=asyncio.FIRST_COMPLETED,
  636. )
  637. for task in pending:
  638. task.cancel()
  639. try:
  640. await task
  641. except asyncio.CancelledError:
  642. pass
  643. except Exception as e:
  644. logger.debug("%s proxy connection error: %s", self.name, e)
  645. finally:
  646. self._active_connections.pop(client_id, None)
  647. for writer in [client_writer, printer_writer]:
  648. try:
  649. writer.close()
  650. await writer.wait_closed()
  651. except OSError:
  652. pass
  653. logger.info("%s proxy: client %s disconnected", self.name, client_id)
  654. if self.on_disconnect:
  655. try:
  656. self.on_disconnect(client_id)
  657. except Exception:
  658. pass
  659. async def _forward(
  660. self,
  661. reader: asyncio.StreamReader,
  662. writer: asyncio.StreamWriter,
  663. direction: str,
  664. ) -> None:
  665. """Forward data from reader to writer."""
  666. total_bytes = 0
  667. try:
  668. while self._running:
  669. data = await reader.read(65536)
  670. if not data:
  671. break
  672. writer.write(data)
  673. await writer.drain()
  674. total_bytes += len(data)
  675. logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
  676. except asyncio.CancelledError:
  677. pass
  678. except ConnectionResetError:
  679. logger.debug("%s proxy %s: connection reset", self.name, direction)
  680. except BrokenPipeError:
  681. logger.debug("%s proxy %s: broken pipe", self.name, direction)
  682. except OSError as e:
  683. logger.debug("%s proxy %s error: %s", self.name, direction, e)
  684. logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
  685. class FTPTLSProxy(TLSProxy):
  686. """FTP-aware TLS proxy that handles passive data connections.
  687. Extends TLSProxy to intercept PASV/EPSV responses on the FTP control
  688. channel, dynamically create TLS data proxies on local ports, and rewrite
  689. the responses so the slicer connects to the proxy instead of the printer.
  690. Without this, FTP passive data connections bypass the proxy and go directly
  691. to the printer, which fails when the slicer can't reach the printer's IP.
  692. """
  693. PASV_PORT_MIN = 50000
  694. PASV_PORT_MAX = 50100
  695. async def stop(self) -> None:
  696. """Stop proxy and clean up data connection servers."""
  697. # Close all data servers first
  698. for server in list(self._data_servers):
  699. try:
  700. server.close()
  701. await server.wait_closed()
  702. except OSError:
  703. pass # Best-effort cleanup of data proxy servers
  704. self._data_servers.clear()
  705. await super().stop()
  706. async def start(self) -> None:
  707. """Start the FTP TLS proxy."""
  708. self._data_servers: list[asyncio.Server] = []
  709. await super().start()
  710. async def _handle_client(
  711. self,
  712. client_reader: asyncio.StreamReader,
  713. client_writer: asyncio.StreamWriter,
  714. ) -> None:
  715. """Handle FTP client with PASV/EPSV-aware response forwarding."""
  716. peername = client_writer.get_extra_info("peername")
  717. client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
  718. logger.info("%s proxy: client connected from %s", self.name, client_id)
  719. if self.on_connect:
  720. try:
  721. self.on_connect(client_id)
  722. except Exception:
  723. pass # Ignore connect callback errors; connection proceeds regardless
  724. # Determine our local IP from the control connection socket
  725. sockname = client_writer.get_extra_info("sockname")
  726. local_ip = sockname[0] if sockname else "0.0.0.0" # nosec B104
  727. if local_ip in ("0.0.0.0", "::"): # nosec B104
  728. local_ip = "127.0.0.1"
  729. # Connect to target printer with TLS
  730. try:
  731. printer_reader, printer_writer = await asyncio.wait_for(
  732. asyncio.open_connection(
  733. self.target_host,
  734. self.target_port,
  735. ssl=self._client_ssl_context,
  736. ),
  737. timeout=10.0,
  738. )
  739. logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
  740. except TimeoutError:
  741. logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
  742. client_writer.close()
  743. await client_writer.wait_closed()
  744. return
  745. except ssl.SSLError as e:
  746. logger.error(
  747. "%s proxy: SSL error connecting to %s:%s: %s", self.name, self.target_host, self.target_port, e
  748. )
  749. client_writer.close()
  750. await client_writer.wait_closed()
  751. return
  752. except OSError as e:
  753. logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
  754. client_writer.close()
  755. await client_writer.wait_closed()
  756. return
  757. # Capture the TLS session from the control channel for data channel
  758. # reuse. vsFTPd (X1C) requires require_ssl_reuse — the data connection
  759. # must present the same TLS session as the control channel.
  760. ctrl_ssl_object = printer_writer.get_extra_info("ssl_object")
  761. ctrl_tls_session = ctrl_ssl_object.session if ctrl_ssl_object else None
  762. if ctrl_tls_session:
  763. logger.debug("%s proxy: captured TLS session for data channel reuse", self.name)
  764. # Track data channel protection level per session.
  765. # PROT C = cleartext data, PROT P = TLS data.
  766. # Default to cleartext — many Bambu printers (A1, H2D) use PROT C.
  767. # If the slicer sends PROT P, we switch to TLS for data connections.
  768. session_state: dict[str, str | ssl.SSLSession] = {"prot": "C"}
  769. if ctrl_tls_session:
  770. session_state["tls_session"] = ctrl_tls_session
  771. # Client→Printer: intercept EPSV and replace with PASV
  772. # EPSV responses only contain a port (no IP), so the slicer reuses
  773. # the control connection IP. If that IP is the real printer (via
  774. # iptables REDIRECT), the data connection bypasses the proxy.
  775. # PASV responses include an explicit IP that we can rewrite.
  776. client_to_printer = asyncio.create_task(
  777. self._forward_ftp_commands(client_reader, printer_writer, f"{client_id}→printer", session_state),
  778. name=f"{self.name}_c2p_{client_id}",
  779. )
  780. # Printer→Client: intercept PASV/EPSV responses
  781. printer_to_client = asyncio.create_task(
  782. self._forward_ftp_control(printer_reader, client_writer, f"printer→{client_id}", local_ip, session_state),
  783. name=f"{self.name}_p2c_{client_id}",
  784. )
  785. self._active_connections[client_id] = (client_to_printer, printer_to_client)
  786. try:
  787. done, pending = await asyncio.wait(
  788. [client_to_printer, printer_to_client],
  789. return_when=asyncio.FIRST_COMPLETED,
  790. )
  791. for task in pending:
  792. task.cancel()
  793. try:
  794. await task
  795. except asyncio.CancelledError:
  796. pass # Expected when cancelling the other forwarding direction
  797. except Exception as e:
  798. logger.debug("%s proxy connection error: %s", self.name, e)
  799. finally:
  800. self._active_connections.pop(client_id, None)
  801. for writer in [client_writer, printer_writer]:
  802. try:
  803. writer.close()
  804. await writer.wait_closed()
  805. except OSError:
  806. pass # Best-effort connection cleanup; peer may have disconnected
  807. logger.info("%s proxy: client %s disconnected", self.name, client_id)
  808. if self.on_disconnect:
  809. try:
  810. self.on_disconnect(client_id)
  811. except Exception:
  812. pass # Ignore disconnect callback errors; cleanup continues
  813. async def _forward_ftp_commands(
  814. self,
  815. reader: asyncio.StreamReader,
  816. writer: asyncio.StreamWriter,
  817. direction: str,
  818. session_state: dict[str, str | ssl.SSLSession],
  819. ) -> None:
  820. """Forward FTP client commands, replacing EPSV with PASV.
  821. EPSV responses only contain a port number — the client reuses the
  822. control connection IP for data. When the control IP is the real
  823. printer (due to iptables REDIRECT), EPSV data connections bypass
  824. the proxy. PASV responses include an explicit IP that the proxy
  825. can rewrite to its own address.
  826. Also tracks PROT P/C commands to know whether data connections
  827. should use TLS or cleartext.
  828. """
  829. buffer = b""
  830. total_bytes = 0
  831. try:
  832. while self._running:
  833. data = await reader.read(65536)
  834. if not data:
  835. break
  836. total_bytes += len(data)
  837. buffer += data
  838. output = b""
  839. while b"\r\n" in buffer:
  840. idx = buffer.index(b"\r\n")
  841. line = buffer[:idx]
  842. buffer = buffer[idx + 2 :]
  843. cmd_upper = line.strip().upper()
  844. # Track PROT level for data channel encryption
  845. if cmd_upper == b"PROT P":
  846. session_state["prot"] = "P"
  847. logger.info("FTP data protection: PROT P (TLS)")
  848. elif cmd_upper == b"PROT C":
  849. session_state["prot"] = "C"
  850. logger.info("FTP data protection: PROT C (cleartext)")
  851. output += line + b"\r\n"
  852. if output:
  853. writer.write(output)
  854. await writer.drain()
  855. logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
  856. except asyncio.CancelledError:
  857. pass # Expected when the other forwarding direction closes first
  858. except ConnectionResetError:
  859. logger.debug("%s proxy %s: connection reset", self.name, direction)
  860. except BrokenPipeError:
  861. logger.debug("%s proxy %s: broken pipe", self.name, direction)
  862. except OSError as e:
  863. logger.debug("%s proxy %s error: %s", self.name, direction, e)
  864. if buffer:
  865. try:
  866. writer.write(buffer)
  867. await writer.drain()
  868. except OSError:
  869. pass # Best-effort flush of remaining FTP command data
  870. logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
  871. async def _forward_ftp_control(
  872. self,
  873. reader: asyncio.StreamReader,
  874. writer: asyncio.StreamWriter,
  875. direction: str,
  876. local_ip: str,
  877. session_state: dict[str, str | ssl.SSLSession],
  878. ) -> None:
  879. """Forward FTP control channel responses, rewriting PASV/EPSV.
  880. FTP control channel is line-based (\\r\\n terminated). We buffer data
  881. and process complete lines, intercepting 227 (PASV) and 229 (EPSV)
  882. responses to create local data proxies.
  883. """
  884. buffer = b""
  885. total_bytes = 0
  886. try:
  887. while self._running:
  888. data = await reader.read(65536)
  889. if not data:
  890. break
  891. total_bytes += len(data)
  892. buffer += data
  893. output = b""
  894. # Process all complete lines
  895. while b"\r\n" in buffer:
  896. idx = buffer.index(b"\r\n")
  897. line = buffer[:idx]
  898. buffer = buffer[idx + 2 :]
  899. rewritten = await self._maybe_rewrite_pasv(line, local_ip, session_state)
  900. output += rewritten + b"\r\n"
  901. if output:
  902. writer.write(output)
  903. await writer.drain()
  904. logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
  905. except asyncio.CancelledError:
  906. pass # Expected when the other forwarding direction closes first
  907. except ConnectionResetError:
  908. logger.debug("%s proxy %s: connection reset", self.name, direction)
  909. except BrokenPipeError:
  910. logger.debug("%s proxy %s: broken pipe", self.name, direction)
  911. except OSError as e:
  912. logger.debug("%s proxy %s error: %s", self.name, direction, e)
  913. # Flush any remaining buffered data
  914. if buffer:
  915. try:
  916. writer.write(buffer)
  917. await writer.drain()
  918. except OSError:
  919. pass # Best-effort flush of remaining FTP control data
  920. logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
  921. async def _maybe_rewrite_pasv(
  922. self, line: bytes, local_ip: str, session_state: dict[str, str | ssl.SSLSession]
  923. ) -> bytes:
  924. """Rewrite PASV/EPSV response to point to a local data proxy."""
  925. try:
  926. text = line.decode("utf-8")
  927. except UnicodeDecodeError:
  928. return line
  929. # 227 Entering Passive Mode (h1,h2,h3,h4,p1,p2)
  930. if text.startswith("227 "):
  931. match = re.search(r"\(\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*\)", text)
  932. if match:
  933. h1, h2, h3, h4, p1, p2 = (int(x) for x in match.groups())
  934. printer_ip = f"{h1}.{h2}.{h3}.{h4}"
  935. printer_port = p1 * 256 + p2
  936. local_port = await self._create_data_proxy(printer_ip, printer_port, session_state)
  937. if local_port:
  938. ip_parts = local_ip.split(".")
  939. lp1 = local_port // 256
  940. lp2 = local_port % 256
  941. rewritten = (
  942. f"227 Entering Passive Mode "
  943. f"({ip_parts[0]},{ip_parts[1]},{ip_parts[2]},{ip_parts[3]},{lp1},{lp2})"
  944. )
  945. logger.info("FTP PASV rewrite: %s:%s → %s:%s", printer_ip, printer_port, local_ip, local_port)
  946. return rewritten.encode("utf-8")
  947. else:
  948. logger.error("FTP PASV: failed to create data proxy for %s:%s", printer_ip, printer_port)
  949. else:
  950. logger.warning("FTP PASV: 227 response didn't match expected format: %s", text[:100])
  951. # 229 Entering Extended Passive Mode (|||port|)
  952. elif text.startswith("229 "):
  953. match = re.search(r"\(\|\|\|(\d+)\|\)", text)
  954. if match:
  955. printer_port = int(match.group(1))
  956. local_port = await self._create_data_proxy(self.target_host, printer_port, session_state)
  957. if local_port:
  958. rewritten = f"229 Entering Extended Passive Mode (|||{local_port}|)"
  959. logger.info("FTP EPSV rewrite: port %s → %s", printer_port, local_port)
  960. return rewritten.encode("utf-8")
  961. else:
  962. logger.error("FTP EPSV: failed to create data proxy for port %s", printer_port)
  963. else:
  964. logger.warning("FTP EPSV: 229 response didn't match expected format: %s", text[:100])
  965. return line
  966. async def _create_data_proxy(
  967. self, printer_ip: str, printer_port: int, session_state: dict[str, str | ssl.SSLSession]
  968. ) -> int | None:
  969. """Create a one-shot proxy for an FTP data connection.
  970. Prefers the printer's original passive port so the port number stays
  971. the same in the rewritten PASV/EPSV response. This is critical when
  972. the slicer's FTP bounce-attack protection overrides the IP in the PASV
  973. response: the slicer connects to <control_IP>:<port>, and if iptables
  974. REDIRECT maps that port to the local machine, the data proxy must be
  975. listening on the *same* port number.
  976. Falls back to a random port if the original is unavailable.
  977. Uses TLS or cleartext based on the session's PROT level:
  978. - PROT P: TLS on both slicer and printer data connections
  979. - PROT C: cleartext on both sides (common for A1/H2D printers)
  980. Returns the local port number, or None if binding failed.
  981. """
  982. use_tls = session_state.get("prot") == "P"
  983. logger.info(
  984. "FTP data proxy: creating data proxy for %s:%s (printer-side %s)",
  985. printer_ip,
  986. printer_port,
  987. "TLS" if use_tls else "cleartext",
  988. )
  989. # Get control channel TLS session for data channel reuse
  990. tls_session = session_state.get("tls_session") if use_tls else None
  991. # Try the printer's original port first — this ensures the port
  992. # matches even when bounce protection or iptables REDIRECT is in play.
  993. try:
  994. await self._start_data_proxy_server(printer_port, printer_ip, printer_port, use_tls, tls_session)
  995. logger.info("FTP data proxy: using printer's port %s", printer_port)
  996. return printer_port
  997. except OSError as e:
  998. logger.debug(
  999. "FTP data proxy: printer port %s unavailable (%s), trying random",
  1000. printer_port,
  1001. e,
  1002. )
  1003. for _attempt in range(10):
  1004. port = random.randint(self.PASV_PORT_MIN, self.PASV_PORT_MAX)
  1005. try:
  1006. await self._start_data_proxy_server(port, printer_ip, printer_port, use_tls, tls_session)
  1007. logger.info("FTP data proxy: using random port %s", port)
  1008. return port
  1009. except OSError:
  1010. continue
  1011. logger.error("Failed to bind FTP data proxy port after 10 attempts")
  1012. return None
  1013. async def _start_data_proxy_server(
  1014. self,
  1015. port: int,
  1016. printer_ip: str,
  1017. printer_port: int,
  1018. use_tls: bool,
  1019. tls_session: ssl.SSLSession | None = None,
  1020. ) -> None:
  1021. """Start a one-shot server for one FTP data connection.
  1022. When the slicer connects, immediately connects to the printer's data
  1023. port and buffers any slicer data until the printer connection is ready.
  1024. This handles zero-byte uploads (verify_job) where the slicer closes
  1025. the data channel before a naive proxy would finish its TLS handshake.
  1026. The slicer-side listener is ALWAYS cleartext. Even when the slicer
  1027. sends PROT P on the control channel, Bambu Studio does not perform
  1028. a TLS handshake on the data connection — it relies on the implicit
  1029. FTPS control channel for authentication and sends data unencrypted.
  1030. The printer-side outbound connection follows the PROT level:
  1031. - PROT P (use_tls=True): TLS to the printer's data port
  1032. - PROT C (use_tls=False): cleartext to the printer's data port
  1033. This mirrors the control channel's TLS-termination architecture.
  1034. Raises OSError if the port is already in use.
  1035. """
  1036. connected = asyncio.Event()
  1037. server_holder: list[asyncio.Server] = []
  1038. # Slicer side: ALWAYS cleartext — Bambu Studio does not do TLS on
  1039. # the data channel even after sending PROT P.
  1040. # Printer side: TLS if PROT P, cleartext if PROT C.
  1041. # For TLS data connections, wrap the SSL context to reuse the
  1042. # control channel's TLS session if available. vsFTPd (X1C) requires
  1043. # require_ssl_reuse — without this, data connections are rejected
  1044. # with "522 SSL connection failed: session reuse required".
  1045. if use_tls and tls_session:
  1046. client_ssl = _SessionReuseSSLContext(self._client_ssl_context, tls_session)
  1047. logger.debug("FTP data proxy: using TLS session reuse for port %s", port)
  1048. else:
  1049. client_ssl = self._client_ssl_context if use_tls else None
  1050. # Slicer side is ALWAYS cleartext — Bambu Studio does not do TLS on
  1051. # the data channel even after PROT P (confirmed for both H2D and X1C).
  1052. printer_mode = "TLS" if use_tls else "cleartext"
  1053. async def handle_data(
  1054. client_reader: asyncio.StreamReader,
  1055. client_writer: asyncio.StreamWriter,
  1056. ) -> None:
  1057. """Handle one FTP data connection, then close the server."""
  1058. peername = client_writer.get_extra_info("peername")
  1059. data_client = f"{peername[0]}:{peername[1]}" if peername else "unknown"
  1060. logger.info(
  1061. "FTP data proxy port %s (slicer=cleartext, printer=%s): client connected from %s, bridging to %s:%s",
  1062. port,
  1063. printer_mode,
  1064. data_client,
  1065. printer_ip,
  1066. printer_port,
  1067. )
  1068. connected.set()
  1069. # One-shot: close server after accepting first connection
  1070. if server_holder:
  1071. server_holder[0].close()
  1072. printer_writer = None
  1073. try:
  1074. # Buffer any slicer data while connecting to printer.
  1075. # This handles the race where the slicer sends data (or closes
  1076. # for zero-byte files) before the TLS handshake completes.
  1077. slicer_buffer = bytearray()
  1078. slicer_eof = False
  1079. async def buffer_slicer():
  1080. nonlocal slicer_eof
  1081. while True:
  1082. chunk = await client_reader.read(65536)
  1083. if not chunk:
  1084. slicer_eof = True
  1085. return
  1086. slicer_buffer.extend(chunk)
  1087. buffer_task = asyncio.create_task(buffer_slicer())
  1088. # Connect to printer's data port
  1089. printer_reader, printer_writer = await asyncio.wait_for(
  1090. asyncio.open_connection(printer_ip, printer_port, ssl=client_ssl),
  1091. timeout=10.0,
  1092. )
  1093. logger.info(
  1094. "FTP data proxy port %s (printer=%s): connected to printer %s:%s",
  1095. port,
  1096. printer_mode,
  1097. printer_ip,
  1098. printer_port,
  1099. )
  1100. # Stop buffering
  1101. buffer_task.cancel()
  1102. try:
  1103. await buffer_task
  1104. except asyncio.CancelledError:
  1105. pass
  1106. # Flush buffered slicer data to printer
  1107. logger.info(
  1108. "FTP data proxy port %s: buffer=%s bytes, slicer_eof=%s",
  1109. port,
  1110. len(slicer_buffer),
  1111. slicer_eof,
  1112. )
  1113. if slicer_buffer:
  1114. printer_writer.write(bytes(slicer_buffer))
  1115. await printer_writer.drain()
  1116. # Forward remaining slicer data to printer, then close the
  1117. # printer side to signal upload complete.
  1118. #
  1119. # Bambu Studio does NOT close the FTP data channel after sending
  1120. # STOR data — it keeps the connection open and waits for the
  1121. # printer to close its side + send 226 on the control channel.
  1122. # A naive bidirectional proxy deadlocks here because the proxy
  1123. # waits for the slicer EOF that never comes.
  1124. #
  1125. # Fix: read slicer data with an idle timeout. Once data has been
  1126. # received and the slicer goes quiet, close the printer side so
  1127. # the printer can send 226. For RETR (download), the printer
  1128. # sends data and closes — the slicer reads until EOF — so this
  1129. # unidirectional approach works for both directions.
  1130. total_c2p = len(slicer_buffer)
  1131. if not slicer_eof:
  1132. # Read remaining slicer data with idle detection.
  1133. # Must be short — Bambu Studio expects 226 almost instantly
  1134. # after sending data. Too long and the slicer times out.
  1135. idle_timeout = 0.3
  1136. while True:
  1137. try:
  1138. chunk = await asyncio.wait_for(client_reader.read(65536), timeout=idle_timeout)
  1139. except TimeoutError:
  1140. if total_c2p > 0:
  1141. # Slicer sent data then went idle — upload done
  1142. logger.debug(
  1143. "FTP data proxy port %s: slicer idle after %s bytes, closing printer side",
  1144. port,
  1145. total_c2p,
  1146. )
  1147. break
  1148. continue # No data yet, keep waiting
  1149. if not chunk:
  1150. break # Slicer closed
  1151. printer_writer.write(chunk)
  1152. await printer_writer.drain()
  1153. total_c2p += len(chunk)
  1154. logger.debug("FTP proxy data_c2p: total %s bytes", total_c2p)
  1155. # Close printer side to signal upload complete.
  1156. # For TLS, close() sends close_notify which the printer treats
  1157. # as end-of-data. The printer then sends 226 on the control
  1158. # channel. For RETR, this is a no-op since the printer closes
  1159. # first and we'd have exited the loop above via EOF.
  1160. try:
  1161. printer_writer.close()
  1162. await printer_writer.wait_closed()
  1163. except OSError:
  1164. pass
  1165. # Wait for 226 response to propagate through the FTP control
  1166. # channel before closing the slicer's data channel.
  1167. #
  1168. # Without this delay, the data channel FIN arrives at the
  1169. # slicer before the 226 response on the control channel.
  1170. # BambuStudio reacts to the data channel FIN within <1ms
  1171. # by sending QUIT + closing the control channel — before
  1172. # 226 arrives (~2-3ms network RTT). This causes verify_job
  1173. # to be treated as failed and shows the login modal.
  1174. #
  1175. # In a direct connection, the printer sends 226 AND closes
  1176. # the data channel simultaneously, so the slicer gets both
  1177. # at once. The delay here emulates that timing.
  1178. if total_c2p > 0:
  1179. await asyncio.sleep(0.5)
  1180. except Exception as e:
  1181. logger.error("FTP data proxy port %s: error: %s", port, e)
  1182. finally:
  1183. for w in [client_writer, printer_writer]:
  1184. if w:
  1185. try:
  1186. w.close()
  1187. await w.wait_closed()
  1188. except OSError:
  1189. pass # Best-effort data connection cleanup
  1190. logger.info("FTP data proxy port %s: connection closed", port)
  1191. server = await asyncio.start_server(
  1192. handle_data,
  1193. "0.0.0.0", # nosec B104
  1194. port,
  1195. # No TLS on slicer side — Bambu Studio doesn't do TLS on data
  1196. # channel even after PROT P (confirmed by connection hang test).
  1197. )
  1198. server_holder.append(server)
  1199. self._data_servers.append(server)
  1200. # Auto-close after 60s if no connection arrives
  1201. async def auto_close() -> None:
  1202. try:
  1203. await asyncio.wait_for(connected.wait(), timeout=60.0)
  1204. except TimeoutError:
  1205. logger.debug("FTP data proxy on port %s timed out, closing", port)
  1206. try:
  1207. server.close()
  1208. await server.wait_closed()
  1209. except OSError:
  1210. pass # Best-effort timeout cleanup
  1211. finally:
  1212. if server in self._data_servers:
  1213. self._data_servers.remove(server)
  1214. asyncio.create_task(auto_close(), name=f"ftp_data_timeout_{port}")
  1215. logger.debug("FTP data proxy: port %s → %s:%s", port, printer_ip, printer_port)
  1216. class SlicerProxyManager:
  1217. """Manages FTP and MQTT TLS proxies for a single printer target."""
  1218. # Bambu printer ports
  1219. PRINTER_FTP_PORT = 990
  1220. PRINTER_MQTT_PORT = 8883
  1221. PRINTER_FILE_TRANSFER_PORT = 6000
  1222. PRINTER_RTSP_PORT = 322 # X1/H2/P2 series camera (A1/P1 use port 6000)
  1223. PRINTER_BIND_PORTS = [3000, 3002]
  1224. # Local listen ports - must match what Bambu Studio expects
  1225. # Note: Port 990 requires root or CAP_NET_BIND_SERVICE capability
  1226. LOCAL_FTP_PORT = 990
  1227. LOCAL_MQTT_PORT = 8883
  1228. def __init__(
  1229. self,
  1230. target_host: str,
  1231. cert_path: Path,
  1232. key_path: Path,
  1233. on_activity: Callable[[str, str], None] | None = None,
  1234. bind_address: str = "0.0.0.0", # nosec B104
  1235. bind_identity: dict[str, str] | None = None,
  1236. ):
  1237. """Initialize the slicer proxy manager.
  1238. Args:
  1239. target_host: Target printer IP address
  1240. cert_path: Path to server certificate
  1241. key_path: Path to server private key
  1242. on_activity: Optional callback for activity logging (name, message)
  1243. bind_address: IP address to bind proxy listeners to
  1244. bind_identity: Optional dict with keys (serial, model, name, version)
  1245. for the bind/detect response. When provided, the proxy responds
  1246. to detect requests itself instead of forwarding to the printer.
  1247. This ensures the slicer sees the VP identity, not the real printer.
  1248. """
  1249. self.target_host = target_host
  1250. self.cert_path = cert_path
  1251. self.key_path = key_path
  1252. self.on_activity = on_activity
  1253. self.bind_address = bind_address
  1254. self.bind_identity = bind_identity
  1255. self._ftp_proxy: TCPProxy | None = None
  1256. self._mqtt_proxy: TLSProxy | None = None
  1257. self._file_transfer_proxy: TCPProxy | None = None
  1258. self._rtsp_proxy: TCPProxy | None = None
  1259. self._bind_proxies: list[TCPProxy] = []
  1260. self._bind_server = None
  1261. self._probe_servers: list[asyncio.Server] = []
  1262. self._tasks: list[asyncio.Task] = []
  1263. # FTP passive data port range — Bambu printers typically use ports in
  1264. # this range for EPSV/PASV data connections. We pre-listen on all of
  1265. # them so EPSV works transparently without decrypting FTP control.
  1266. FTP_DATA_PORT_MIN = 50000
  1267. FTP_DATA_PORT_MAX = 50100
  1268. async def start(self) -> None:
  1269. """Start proxy services.
  1270. Uses transparent TCP proxying for most protocols (FTP, FileTransfer,
  1271. Camera) — raw bytes are forwarded without TLS termination, so the
  1272. slicer gets the printer's real TLS certificate end-to-end.
  1273. Only MQTT is TLS-terminated because we must decrypt the payload to
  1274. rewrite the printer's real IP with the proxy's bind IP.
  1275. """
  1276. logger.info("Starting slicer proxy to %s (transparent mode)", self.target_host)
  1277. # Detect iptables port redirect for FTP
  1278. ftp_listen_port = self.LOCAL_FTP_PORT
  1279. redirect_target = detect_port_redirect(self.LOCAL_FTP_PORT)
  1280. if redirect_target:
  1281. logger.info(
  1282. "Detected iptables redirect: port %d → %d. FTP proxy will listen on %d.",
  1283. self.LOCAL_FTP_PORT,
  1284. redirect_target,
  1285. redirect_target,
  1286. )
  1287. ftp_listen_port = redirect_target
  1288. # FTP control — raw TCP pass-through (end-to-end TLS with printer)
  1289. self._ftp_proxy = TCPProxy(
  1290. name="FTP",
  1291. listen_port=ftp_listen_port,
  1292. target_host=self.target_host,
  1293. target_port=self.PRINTER_FTP_PORT,
  1294. on_connect=lambda cid: self._log_activity("FTP", f"connected: {cid}"),
  1295. on_disconnect=lambda cid: self._log_activity("FTP", f"disconnected: {cid}"),
  1296. bind_address=self.bind_address,
  1297. )
  1298. # FTP data ports — pre-listen on the entire passive port range.
  1299. # Since FTP control is encrypted end-to-end, we can't read EPSV
  1300. # responses to know which port the printer chose. Instead, we
  1301. # listen on every port in the range and forward to the same port
  1302. # on the printer. The slicer connects to bind_ip:PORT (from EPSV)
  1303. # and we transparently relay to printer_ip:PORT.
  1304. self._ftp_data_proxies: list[TCPProxy] = []
  1305. for port in range(self.FTP_DATA_PORT_MIN, self.FTP_DATA_PORT_MAX + 1):
  1306. dp = TCPProxy(
  1307. name=f"FTP-Data-{port}",
  1308. listen_port=port,
  1309. target_host=self.target_host,
  1310. target_port=port,
  1311. bind_address=self.bind_address,
  1312. )
  1313. self._ftp_data_proxies.append(dp)
  1314. # MQTT — TLS-terminating proxy (must decrypt to rewrite IP addresses)
  1315. self._mqtt_proxy = TLSProxy(
  1316. name="MQTT",
  1317. listen_port=self.LOCAL_MQTT_PORT,
  1318. target_host=self.target_host,
  1319. target_port=self.PRINTER_MQTT_PORT,
  1320. server_cert_path=self.cert_path,
  1321. server_key_path=self.key_path,
  1322. on_connect=lambda cid: self._log_activity("MQTT", f"connected: {cid}"),
  1323. on_disconnect=lambda cid: self._log_activity("MQTT", f"disconnected: {cid}"),
  1324. bind_address=self.bind_address,
  1325. rewrite_ip=(self.target_host, self.bind_address) if self.bind_address != "0.0.0.0" else None,
  1326. )
  1327. # File transfer — raw TCP pass-through (port 6000)
  1328. self._file_transfer_proxy = TCPProxy(
  1329. name="FileTransfer",
  1330. listen_port=self.PRINTER_FILE_TRANSFER_PORT,
  1331. target_host=self.target_host,
  1332. target_port=self.PRINTER_FILE_TRANSFER_PORT,
  1333. on_connect=lambda cid: self._log_activity("FileTransfer", f"connected: {cid}"),
  1334. on_disconnect=lambda cid: self._log_activity("FileTransfer", f"disconnected: {cid}"),
  1335. bind_address=self.bind_address,
  1336. )
  1337. # RTSP camera — raw TCP pass-through (port 322)
  1338. self._rtsp_proxy = TCPProxy(
  1339. name="RTSP",
  1340. listen_port=self.PRINTER_RTSP_PORT,
  1341. target_host=self.target_host,
  1342. target_port=self.PRINTER_RTSP_PORT,
  1343. on_connect=lambda cid: self._log_activity("RTSP", f"connected: {cid}"),
  1344. on_disconnect=lambda cid: self._log_activity("RTSP", f"disconnected: {cid}"),
  1345. bind_address=self.bind_address,
  1346. )
  1347. # Bind/auth — respond with VP identity instead of proxying to printer.
  1348. # The detect response contains the printer name, serial, model, and
  1349. # bind status. Proxying it would leak the real printer's identity and
  1350. # cause the slicer to treat it as a different device.
  1351. if self.bind_identity:
  1352. from backend.app.services.virtual_printer.bind_server import BindServer
  1353. self._bind_server = BindServer(
  1354. serial=self.bind_identity["serial"],
  1355. model=self.bind_identity["model"],
  1356. name=self.bind_identity["name"],
  1357. version=self.bind_identity.get("version", "01.00.00.00"),
  1358. bind_address=self.bind_address,
  1359. cert_path=self.cert_path,
  1360. key_path=self.key_path,
  1361. )
  1362. else:
  1363. # Fallback: proxy bind requests to the real printer
  1364. for bind_port in self.PRINTER_BIND_PORTS:
  1365. if bind_port == 3002:
  1366. proxy = TLSProxy(
  1367. name="Bind-TLS",
  1368. listen_port=bind_port,
  1369. target_host=self.target_host,
  1370. target_port=bind_port,
  1371. server_cert_path=self.cert_path,
  1372. server_key_path=self.key_path,
  1373. on_connect=lambda cid: self._log_activity("Bind", f"connected: {cid}"),
  1374. on_disconnect=lambda cid: self._log_activity("Bind", f"disconnected: {cid}"),
  1375. bind_address=self.bind_address,
  1376. )
  1377. else:
  1378. proxy = TCPProxy(
  1379. name="Bind",
  1380. listen_port=bind_port,
  1381. target_host=self.target_host,
  1382. target_port=bind_port,
  1383. on_connect=lambda cid: self._log_activity("Bind", f"connected: {cid}"),
  1384. on_disconnect=lambda cid: self._log_activity("Bind", f"disconnected: {cid}"),
  1385. bind_address=self.bind_address,
  1386. )
  1387. self._bind_proxies.append(proxy)
  1388. # Start as background tasks
  1389. async def run_with_logging(proxy: TLSProxy | TCPProxy) -> None:
  1390. try:
  1391. await proxy.start()
  1392. except Exception as e:
  1393. logger.error("Slicer proxy %s failed: %s", proxy.name, e)
  1394. self._tasks = [
  1395. asyncio.create_task(
  1396. run_with_logging(self._ftp_proxy),
  1397. name="slicer_proxy_ftp",
  1398. ),
  1399. asyncio.create_task(
  1400. run_with_logging(self._mqtt_proxy),
  1401. name="slicer_proxy_mqtt",
  1402. ),
  1403. asyncio.create_task(
  1404. run_with_logging(self._file_transfer_proxy),
  1405. name="slicer_proxy_file_transfer",
  1406. ),
  1407. asyncio.create_task(
  1408. run_with_logging(self._rtsp_proxy),
  1409. name="slicer_proxy_rtsp",
  1410. ),
  1411. ]
  1412. if self._bind_server:
  1413. self._tasks.append(
  1414. asyncio.create_task(
  1415. run_with_logging(self._bind_server),
  1416. name="slicer_proxy_bind_server",
  1417. )
  1418. )
  1419. for bp in self._bind_proxies:
  1420. self._tasks.append(
  1421. asyncio.create_task(
  1422. run_with_logging(bp),
  1423. name=f"slicer_proxy_bind_{bp.listen_port}",
  1424. )
  1425. )
  1426. # FTP data port proxies (50000-50100)
  1427. for dp in self._ftp_data_proxies:
  1428. self._tasks.append(
  1429. asyncio.create_task(
  1430. run_with_logging(dp),
  1431. name=f"slicer_proxy_ftp_data_{dp.listen_port}",
  1432. )
  1433. )
  1434. # Diagnostic probe: listen on common un-proxied ports to detect
  1435. # if the slicer tries to reach a service we don't handle.
  1436. if self.bind_address and self.bind_address != "0.0.0.0":
  1437. for probe_port in (21, 80, 443):
  1438. try:
  1439. srv = await asyncio.start_server(
  1440. lambda r, w, p=probe_port: self._probe_handler(r, w, p),
  1441. self.bind_address,
  1442. probe_port,
  1443. )
  1444. self._probe_servers.append(srv)
  1445. except OSError:
  1446. pass # Port in use or no permission — skip
  1447. if self._probe_servers:
  1448. probed = [s.sockets[0].getsockname()[1] for s in self._probe_servers if s.sockets]
  1449. logger.info("Proxy diagnostic: probing un-proxied ports %s on %s", probed, self.bind_address)
  1450. logger.info(
  1451. "Slicer proxy started for %s (transparent TCP + MQTT TLS, %d FTP data ports)",
  1452. self.target_host,
  1453. len(self._ftp_data_proxies),
  1454. )
  1455. # Wait for tasks to complete (they run until cancelled)
  1456. # This keeps the start() coroutine alive so the parent task doesn't complete
  1457. try:
  1458. await asyncio.gather(*self._tasks)
  1459. except asyncio.CancelledError:
  1460. logger.debug("Slicer proxy start cancelled")
  1461. async def stop(self) -> None:
  1462. """Stop all proxies."""
  1463. logger.info("Stopping slicer proxy")
  1464. # Stop proxies
  1465. if self._ftp_proxy:
  1466. await self._ftp_proxy.stop()
  1467. self._ftp_proxy = None
  1468. if self._mqtt_proxy:
  1469. await self._mqtt_proxy.stop()
  1470. self._mqtt_proxy = None
  1471. if self._file_transfer_proxy:
  1472. await self._file_transfer_proxy.stop()
  1473. self._file_transfer_proxy = None
  1474. if self._rtsp_proxy:
  1475. await self._rtsp_proxy.stop()
  1476. self._rtsp_proxy = None
  1477. if self._bind_server:
  1478. await self._bind_server.stop()
  1479. self._bind_server = None
  1480. for bp in self._bind_proxies:
  1481. await bp.stop()
  1482. self._bind_proxies = []
  1483. for dp in self._ftp_data_proxies:
  1484. await dp.stop()
  1485. self._ftp_data_proxies = []
  1486. for srv in self._probe_servers:
  1487. srv.close()
  1488. self._probe_servers = []
  1489. # Cancel tasks
  1490. for task in self._tasks:
  1491. task.cancel()
  1492. if self._tasks:
  1493. try:
  1494. await asyncio.wait_for(
  1495. asyncio.gather(*self._tasks, return_exceptions=True),
  1496. timeout=2.0,
  1497. )
  1498. except TimeoutError:
  1499. logger.debug("Some proxy tasks didn't stop in time")
  1500. self._tasks = []
  1501. logger.info("Slicer proxy stopped")
  1502. async def _probe_handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, port: int) -> None:
  1503. """Log unexpected connections on un-proxied ports for diagnostics."""
  1504. peername = writer.get_extra_info("peername")
  1505. client = f"{peername[0]}:{peername[1]}" if peername else "unknown"
  1506. logger.warning(
  1507. "PROBE: slicer connected to un-proxied port %d from %s — this port may need proxying",
  1508. port,
  1509. client,
  1510. )
  1511. writer.close()
  1512. try:
  1513. await writer.wait_closed()
  1514. except OSError:
  1515. pass
  1516. def _log_activity(self, name: str, message: str) -> None:
  1517. """Log activity via callback if configured."""
  1518. if self.on_activity:
  1519. try:
  1520. self.on_activity(name, message)
  1521. except Exception:
  1522. pass # Ignore activity callback errors; logging is non-critical
  1523. @property
  1524. def is_running(self) -> bool:
  1525. """Check if proxies are running."""
  1526. return len(self._tasks) > 0 and all(not t.done() for t in self._tasks)
  1527. def get_status(self) -> dict:
  1528. """Get proxy status."""
  1529. return {
  1530. "running": self.is_running,
  1531. "target_host": self.target_host,
  1532. "ftp_port": self.LOCAL_FTP_PORT,
  1533. "mqtt_port": self.LOCAL_MQTT_PORT,
  1534. "bind_ports": self.PRINTER_BIND_PORTS,
  1535. "ftp_connections": (len(self._ftp_proxy._active_connections) if self._ftp_proxy else 0),
  1536. "mqtt_connections": (len(self._mqtt_proxy._active_connections) if self._mqtt_proxy else 0),
  1537. "bind_connections": sum(len(bp._active_connections) for bp in self._bind_proxies),
  1538. }