| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776 |
- """Proxy for slicer-to-printer communication.
- This module provides both transparent TCP proxying and TLS-terminating
- proxying for forwarding data between a slicer and a real Bambu printer,
- enabling remote printing over any network connection.
- Most protocols (FTP, FileTransfer, Camera) use transparent TCP proxying —
- raw bytes are forwarded without decryption, preserving end-to-end TLS
- between slicer and printer. Only MQTT is TLS-terminated so Bambuddy can
- rewrite the printer's real IP with the proxy's bind IP in MQTT payloads.
- """
- # ruff: noqa: N801
- import asyncio
- import logging
- import random
- import re
- import ssl
- import subprocess
- from collections.abc import Callable
- from pathlib import Path
- logger = logging.getLogger(__name__)
- class _SessionReuseSSLContext:
- """Proxy around SSLContext that injects a TLS session into wrap_bio().
- vsFTPd (used by some Bambu printers like X1C) requires TLS session reuse
- on FTP data channels — the data connection must reuse the TLS session from
- the control channel. Without this, the printer rejects the data connection
- with "522 SSL connection failed: session reuse required".
- asyncio's open_connection() calls SSLContext.wrap_bio() internally but
- doesn't expose a session parameter. This wrapper intercepts wrap_bio()
- to inject the saved control-channel session, enabling session reuse.
- """
- def __init__(self, ctx: ssl.SSLContext, session: ssl.SSLSession) -> None:
- object.__setattr__(self, "_ctx", ctx)
- object.__setattr__(self, "_session", session)
- def __getattr__(self, name: str) -> object:
- return getattr(self._ctx, name)
- def wrap_bio(
- self,
- incoming: ssl.MemoryBIO,
- outgoing: ssl.MemoryBIO,
- server_side: bool = False,
- server_hostname: str | None = None,
- **kwargs: object,
- ) -> ssl.SSLObject:
- return self._ctx.wrap_bio(
- incoming,
- outgoing,
- server_side=server_side,
- server_hostname=server_hostname,
- session=self._session,
- **kwargs,
- )
- def detect_port_redirect(port: int) -> int | None:
- """Detect if iptables redirects a port to another port.
- When iptables NAT REDIRECT rules exist (e.g. port redirects), connections
- to the original port never reach our socket because iptables intercepts
- them in PREROUTING. We must listen on the redirect target instead.
- Returns the redirect target port, or None if no redirect is active.
- """
- # Method 1: Read persistent rules file (doesn't require root)
- for rules_path in ("/etc/iptables/rules.v4", "/etc/iptables.rules"):
- try:
- with open(rules_path) as f:
- content = f.read()
- match = re.search(rf"--dport {port}\b.*?--to-ports\s+(\d+)", content)
- if match:
- target = int(match.group(1))
- if target != port:
- return target
- except (FileNotFoundError, PermissionError, OSError):
- continue
- # Method 2: Query live iptables rules (may require root)
- try:
- result = subprocess.run( # noqa: S603, S607
- ["iptables-save", "-t", "nat"],
- capture_output=True,
- text=True,
- timeout=5,
- )
- if result.returncode == 0:
- match = re.search(rf"--dport {port}\b.*?--to-ports\s+(\d+)", result.stdout)
- if match:
- target = int(match.group(1))
- if target != port:
- return target
- except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
- pass
- return None
- class TLSProxy:
- """TLS terminating proxy that forwards data between client and target.
- This proxy terminates TLS on both ends, allowing the slicer to connect
- to Bambuddy's certificate while Bambuddy connects to the real printer.
- """
- def __init__(
- self,
- name: str,
- listen_port: int,
- target_host: str,
- target_port: int,
- server_cert_path: Path,
- server_key_path: Path,
- on_connect: Callable[[str], None] | None = None,
- on_disconnect: Callable[[str], None] | None = None,
- bind_address: str = "0.0.0.0", # nosec B104
- rewrite_ip: tuple[str, str] | None = None,
- ):
- """Initialize the TLS proxy.
- Args:
- name: Friendly name for logging (e.g., "FTP", "MQTT")
- listen_port: Port to listen on for incoming connections
- target_host: Target printer IP/hostname
- target_port: Target printer port
- server_cert_path: Path to server certificate (for accepting slicer connections)
- server_key_path: Path to server private key
- on_connect: Optional callback when client connects (receives client_id)
- on_disconnect: Optional callback when client disconnects (receives client_id)
- bind_address: IP address to bind to (default: all interfaces)
- rewrite_ip: Optional (old_ip, new_ip) tuple — replaces occurrences of
- the printer's real IP with the proxy's bind IP in printer→client data.
- This prevents the slicer from discovering the printer's real IP
- in MQTT payloads (ip_addr, rtsp_url, etc.) and bypassing the proxy.
- """
- self.name = name
- self.listen_port = listen_port
- self.target_host = target_host
- self.target_port = target_port
- self.server_cert_path = server_cert_path
- self.server_key_path = server_key_path
- self.on_connect = on_connect
- self.on_disconnect = on_disconnect
- self.bind_address = bind_address
- # IP rewriting for printer→client direction
- if rewrite_ip:
- self._rewrite_old = rewrite_ip[0].encode("utf-8")
- self._rewrite_new = rewrite_ip[1].encode("utf-8")
- # Also rewrite the integer IP in net.info[].ip fields.
- # Bambu printers encode their IP as a little-endian uint32 integer
- # in the JSON payload. BambuStudio reads this to set dev_ip.
- self._rewrite_old_int = self._ip_to_le_int_bytes(rewrite_ip[0])
- self._rewrite_new_int = self._ip_to_le_int_bytes(rewrite_ip[1])
- else:
- self._rewrite_old = None
- self._rewrite_new = None
- self._rewrite_old_int = None
- self._rewrite_new_int = None
- self._server: asyncio.Server | None = None
- self._running = False
- self._active_connections: dict[str, tuple[asyncio.Task, asyncio.Task]] = {}
- self._server_ssl_context: ssl.SSLContext | None = None
- self._client_ssl_context: ssl.SSLContext | None = None
- @staticmethod
- def _ip_to_le_int_bytes(ip: str) -> bytes:
- """Convert an IP address to its little-endian integer JSON representation.
- E.g. "192.168.255.16" → b"285190336" (the integer as a decimal string,
- as it appears in Bambu MQTT JSON payloads in the net.info[].ip field).
- """
- import struct as _struct
- parts = ip.split(".")
- packed = bytes(int(p) for p in parts)
- le_int = _struct.unpack("<I", packed)[0]
- return str(le_int).encode("utf-8")
- def _create_server_ssl_context(self) -> ssl.SSLContext:
- """Create SSL context for accepting client (slicer) connections."""
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- ctx.load_cert_chain(self.server_cert_path, self.server_key_path)
- # Allow older TLS versions for compatibility with slicers
- ctx.minimum_version = ssl.TLSVersion.TLSv1_2
- # Don't require client certificates
- ctx.verify_mode = ssl.CERT_NONE
- return ctx
- def _create_client_ssl_context(self) -> ssl.SSLContext:
- """Create SSL context for connecting to printer."""
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- # Don't verify printer's certificate (self-signed)
- ctx.check_hostname = False
- ctx.verify_mode = ssl.CERT_NONE
- ctx.minimum_version = ssl.TLSVersion.TLSv1_2
- # Bambu printers use plain RSA key exchange (no ECDHE/DHE),
- # which modern OpenSSL 3.x defaults exclude. Add them back.
- ctx.set_ciphers("DEFAULT:AES256-GCM-SHA384:AES128-GCM-SHA256")
- return ctx
- async def start(self) -> None:
- """Start the TLS proxy server."""
- if self._running:
- return
- logger.info(
- f"Starting {self.name} TLS proxy: {self.bind_address}:{self.listen_port} → {self.target_host}:{self.target_port}"
- )
- try:
- self._running = True
- # Create SSL contexts
- self._server_ssl_context = self._create_server_ssl_context()
- self._client_ssl_context = self._create_client_ssl_context()
- # Start server with TLS
- self._server = await asyncio.start_server(
- self._handle_client,
- self.bind_address,
- self.listen_port,
- ssl=self._server_ssl_context,
- )
- logger.info("%s TLS proxy listening on port %s", self.name, self.listen_port)
- async with self._server:
- await self._server.serve_forever()
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("%s proxy port %s is already in use", self.name, self.listen_port)
- elif e.errno == 13: # Permission denied
- logger.error(
- "%s proxy: cannot bind to port %s (permission denied). "
- "Port %s requires root or CAP_NET_BIND_SERVICE. "
- "Docker: add 'cap_add: [NET_BIND_SERVICE]' to docker-compose.yml. "
- "Native: use 'sudo setcap cap_net_bind_service=+ep $(which python3)' "
- "or redirect with iptables.",
- self.name,
- self.listen_port,
- self.listen_port,
- )
- else:
- logger.error("%s proxy error: %s", self.name, e)
- except asyncio.CancelledError:
- logger.debug("%s proxy task cancelled", self.name)
- except Exception as e:
- logger.error("%s proxy error: %s", self.name, e)
- finally:
- await self.stop()
- async def stop(self) -> None:
- """Stop the TLS proxy server."""
- logger.info("Stopping %s proxy", self.name)
- self._running = False
- # Cancel all active connection tasks
- for client_id, (task1, task2) in list(self._active_connections.items()):
- task1.cancel()
- task2.cancel()
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass # Ignore disconnect callback errors during shutdown
- self._active_connections.clear()
- if self._server:
- try:
- self._server.close()
- await self._server.wait_closed()
- except OSError as e:
- logger.debug("Error closing %s proxy server: %s", self.name, e)
- self._server = None
- async def _handle_client(
- self,
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle a new client connection by proxying to target."""
- peername = client_writer.get_extra_info("peername")
- client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info("%s proxy: client connected from %s", self.name, client_id)
- if self.on_connect:
- try:
- self.on_connect(client_id)
- except Exception:
- pass # Ignore connect callback errors; connection proceeds regardless
- # Connect to target printer with TLS
- try:
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(
- self.target_host,
- self.target_port,
- ssl=self._client_ssl_context,
- ),
- timeout=10.0,
- )
- logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
- except TimeoutError:
- logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
- client_writer.close()
- await client_writer.wait_closed()
- return
- except ssl.SSLError as e:
- logger.error(
- "%s proxy: SSL error connecting to %s:%s: %s", self.name, self.target_host, self.target_port, e
- )
- client_writer.close()
- await client_writer.wait_closed()
- return
- except OSError as e:
- logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
- client_writer.close()
- await client_writer.wait_closed()
- return
- # Create bidirectional forwarding tasks
- client_to_printer = asyncio.create_task(
- self._forward(client_reader, printer_writer, f"{client_id}→printer"),
- name=f"{self.name}_c2p_{client_id}",
- )
- printer_to_client = asyncio.create_task(
- self._forward(printer_reader, client_writer, f"printer→{client_id}", rewrite_ip=True),
- name=f"{self.name}_p2c_{client_id}",
- )
- self._active_connections[client_id] = (client_to_printer, printer_to_client)
- try:
- # Wait for either direction to complete (connection closed)
- done, pending = await asyncio.wait(
- [client_to_printer, printer_to_client],
- return_when=asyncio.FIRST_COMPLETED,
- )
- # Cancel the other direction
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass # Expected when cancelling the other forwarding direction
- except Exception as e:
- logger.debug("%s proxy connection error: %s", self.name, e)
- finally:
- # Clean up
- self._active_connections.pop(client_id, None)
- for writer in [client_writer, printer_writer]:
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort connection cleanup; peer may have disconnected
- logger.info("%s proxy: client %s disconnected", self.name, client_id)
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass # Ignore disconnect callback errors; cleanup continues
- @staticmethod
- def _rewrite_mqtt_ip(
- data: bytes,
- old_ip: bytes,
- new_ip: bytes,
- buffer: bytearray,
- extra_replacements: list[tuple[bytes, bytes]] | None = None,
- ) -> tuple[bytes, bytearray]:
- """Rewrite IP addresses inside MQTT packets, preserving packet framing.
- MQTT packets have a variable-length header encoding the remaining
- packet length. A naive bytes.replace() would corrupt this framing
- when old_ip and new_ip differ in length.
- This method parses individual MQTT packets out of the data stream,
- performs the replacement only on PUBLISH payloads, and re-encodes
- the remaining-length field to match the new size.
- Incomplete packets are buffered and returned for the next call.
- Args:
- extra_replacements: Additional (old, new) byte pairs to replace
- (e.g. the integer IP representation in net.info[].ip).
- Returns (output_data, remaining_buffer).
- """
- buffer.extend(data)
- # Check if any replacement target exists in the buffer
- has_target = old_ip in buffer
- if not has_target and extra_replacements:
- has_target = any(old in buffer for old, _new in extra_replacements)
- if not has_target:
- # Fast path: no IP in buffer, but we still need to check for
- # incomplete packets at the end that might contain a partial IP.
- # For safety, try to parse and emit only complete packets.
- result = bytearray()
- pos = 0
- length = len(buffer)
- while pos < length:
- packet_start = pos
- if pos + 1 >= length:
- break
- pos += 1 # header byte
- # Parse remaining length
- remaining_length = 0
- multiplier = 1
- length_bytes = 0
- while pos < length:
- encoded_byte = buffer[pos]
- pos += 1
- remaining_length += (encoded_byte & 0x7F) * multiplier
- multiplier *= 128
- length_bytes += 1
- if (encoded_byte & 0x80) == 0:
- break
- if length_bytes >= 4:
- break
- if pos + remaining_length > length:
- # Incomplete — keep in buffer
- new_buffer = bytearray(buffer[packet_start:])
- return bytes(result), new_buffer
- pos += remaining_length
- result.extend(buffer[packet_start:pos])
- # All complete
- buffer.clear()
- return bytes(result) if result else bytes(data), buffer
- # Buffer contains old_ip — parse packets and rewrite
- result = bytearray()
- pos = 0
- length = len(buffer)
- while pos < length:
- packet_start = pos
- if pos >= length:
- break
- header_byte = buffer[pos]
- pos += 1
- # Remaining length: variable-length encoding (1-4 bytes)
- remaining_length = 0
- multiplier = 1
- length_bytes = 0
- while pos < length:
- encoded_byte = buffer[pos]
- pos += 1
- remaining_length += (encoded_byte & 0x7F) * multiplier
- multiplier *= 128
- length_bytes += 1
- if (encoded_byte & 0x80) == 0:
- break
- if length_bytes >= 4:
- break
- # Check if we have enough data for the full packet
- if pos + remaining_length > length:
- # Incomplete packet — keep in buffer for next call
- new_buffer = bytearray(buffer[packet_start:])
- return bytes(result), new_buffer
- packet_type = (header_byte >> 4) & 0x0F
- packet_body = buffer[pos : pos + remaining_length]
- pos += remaining_length
- # Only rewrite PUBLISH packets (type 3)
- needs_rewrite = packet_type == 3 and (
- old_ip in packet_body
- or (extra_replacements and any(old in packet_body for old, _new in extra_replacements))
- )
- if needs_rewrite:
- new_body = bytes(packet_body).replace(old_ip, new_ip)
- if extra_replacements:
- for old_val, new_val in extra_replacements:
- new_body = new_body.replace(old_val, new_val)
- # Re-encode: header byte + new remaining length + new body
- result.append(header_byte)
- # Encode remaining length (MQTT variable-length encoding)
- new_remaining = len(new_body)
- while True:
- encoded_byte = new_remaining % 128
- new_remaining //= 128
- if new_remaining > 0:
- encoded_byte |= 0x80
- result.append(encoded_byte)
- if new_remaining == 0:
- break
- result.extend(new_body)
- else:
- # Pass through unchanged
- result.extend(buffer[packet_start:pos])
- buffer.clear()
- return bytes(result), buffer
- async def _forward(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- rewrite_ip: bool = False,
- ) -> None:
- """Forward data from reader to writer.
- Args:
- reader: Source stream (already TLS-decrypted)
- writer: Destination stream (will be TLS-encrypted by the stream)
- direction: Description for logging (e.g., "client→printer")
- rewrite_ip: If True and rewrite_ip was configured, replace the
- printer's real IP with the proxy's bind IP in the data.
- """
- do_rewrite = rewrite_ip and self._rewrite_old is not None
- rewrite_buffer = bytearray() if do_rewrite else None
- rewrite_logged = False
- total_bytes = 0
- try:
- while self._running:
- # Read chunk - use reasonable buffer size
- data = await reader.read(65536)
- if not data:
- # Connection closed
- break
- # Rewrite printer IP → proxy IP in MQTT PUBLISH payloads
- # to prevent the slicer from bypassing the proxy.
- if do_rewrite:
- extra = [(self._rewrite_old_int, self._rewrite_new_int)] if self._rewrite_old_int else None
- data, rewrite_buffer = self._rewrite_mqtt_ip(
- data,
- self._rewrite_old,
- self._rewrite_new,
- rewrite_buffer,
- extra_replacements=extra,
- )
- if not rewrite_logged and data:
- if self._rewrite_old in data:
- logger.warning(
- "%s proxy IP rewrite FAILED — %s still present after rewrite!",
- self.name,
- self._rewrite_old.decode(),
- )
- else:
- logger.info(
- "%s proxy IP rewrite active: %s → %s",
- self.name,
- self._rewrite_old.decode(),
- self._rewrite_new.decode(),
- )
- rewrite_logged = True
- if not data:
- continue # All data buffered, waiting for more
- # Forward to destination
- writer.write(data)
- await writer.drain()
- total_bytes += len(data)
- except asyncio.CancelledError:
- pass # Expected when the other forwarding direction closes first
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- class TCPProxy:
- """Raw TCP proxy that forwards data without TLS termination.
- Used for protocols where the printer doesn't use TLS (e.g., port 3000
- binding/authentication protocol).
- """
- def __init__(
- self,
- name: str,
- listen_port: int,
- target_host: str,
- target_port: int,
- on_connect: Callable[[str], None] | None = None,
- on_disconnect: Callable[[str], None] | None = None,
- bind_address: str = "0.0.0.0", # nosec B104
- ):
- self.name = name
- self.listen_port = listen_port
- self.target_host = target_host
- self.target_port = target_port
- self.on_connect = on_connect
- self.on_disconnect = on_disconnect
- self.bind_address = bind_address
- self._server: asyncio.Server | None = None
- self._running = False
- self._active_connections: dict[str, tuple[asyncio.Task, asyncio.Task]] = {}
- async def start(self) -> None:
- """Start the TCP proxy server."""
- if self._running:
- return
- logger.info(
- "Starting %s TCP proxy: %s:%s → %s:%s",
- self.name,
- self.bind_address,
- self.listen_port,
- self.target_host,
- self.target_port,
- )
- try:
- self._running = True
- self._server = await asyncio.start_server(
- self._handle_client,
- self.bind_address,
- self.listen_port,
- )
- logger.info("%s TCP proxy listening on port %s", self.name, self.listen_port)
- async with self._server:
- await self._server.serve_forever()
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("%s proxy port %s is already in use", self.name, self.listen_port)
- else:
- logger.error("%s proxy error: %s", self.name, e)
- except asyncio.CancelledError:
- logger.debug("%s proxy task cancelled", self.name)
- except Exception as e:
- logger.error("%s proxy error: %s", self.name, e)
- finally:
- await self.stop()
- async def stop(self) -> None:
- """Stop the TCP proxy server."""
- logger.info("Stopping %s proxy", self.name)
- self._running = False
- for client_id, (task1, task2) in list(self._active_connections.items()):
- task1.cancel()
- task2.cancel()
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass
- self._active_connections.clear()
- if self._server:
- try:
- self._server.close()
- await self._server.wait_closed()
- except OSError as e:
- logger.debug("Error closing %s proxy server: %s", self.name, e)
- self._server = None
- async def _handle_client(
- self,
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle a new client connection by proxying to target."""
- peername = client_writer.get_extra_info("peername")
- client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info("%s proxy: client connected from %s", self.name, client_id)
- if self.on_connect:
- try:
- self.on_connect(client_id)
- except Exception:
- pass
- try:
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(self.target_host, self.target_port),
- timeout=10.0,
- )
- logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
- except TimeoutError:
- logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
- client_writer.close()
- await client_writer.wait_closed()
- return
- except OSError as e:
- logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
- client_writer.close()
- await client_writer.wait_closed()
- return
- client_to_printer = asyncio.create_task(
- self._forward(client_reader, printer_writer, f"{client_id}→printer"),
- name=f"{self.name}_c2p_{client_id}",
- )
- printer_to_client = asyncio.create_task(
- self._forward(printer_reader, client_writer, f"printer→{client_id}"),
- name=f"{self.name}_p2c_{client_id}",
- )
- self._active_connections[client_id] = (client_to_printer, printer_to_client)
- try:
- done, pending = await asyncio.wait(
- [client_to_printer, printer_to_client],
- return_when=asyncio.FIRST_COMPLETED,
- )
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
- except Exception as e:
- logger.debug("%s proxy connection error: %s", self.name, e)
- finally:
- self._active_connections.pop(client_id, None)
- for writer in [client_writer, printer_writer]:
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass
- logger.info("%s proxy: client %s disconnected", self.name, client_id)
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass
- async def _forward(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- ) -> None:
- """Forward data from reader to writer."""
- total_bytes = 0
- try:
- while self._running:
- data = await reader.read(65536)
- if not data:
- break
- writer.write(data)
- await writer.drain()
- total_bytes += len(data)
- logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
- except asyncio.CancelledError:
- pass
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- class FTPTLSProxy(TLSProxy):
- """FTP-aware TLS proxy that handles passive data connections.
- Extends TLSProxy to intercept PASV/EPSV responses on the FTP control
- channel, dynamically create TLS data proxies on local ports, and rewrite
- the responses so the slicer connects to the proxy instead of the printer.
- Without this, FTP passive data connections bypass the proxy and go directly
- to the printer, which fails when the slicer can't reach the printer's IP.
- """
- PASV_PORT_MIN = 50000
- PASV_PORT_MAX = 50100
- async def stop(self) -> None:
- """Stop proxy and clean up data connection servers."""
- # Close all data servers first
- for server in list(self._data_servers):
- try:
- server.close()
- await server.wait_closed()
- except OSError:
- pass # Best-effort cleanup of data proxy servers
- self._data_servers.clear()
- await super().stop()
- async def start(self) -> None:
- """Start the FTP TLS proxy."""
- self._data_servers: list[asyncio.Server] = []
- await super().start()
- async def _handle_client(
- self,
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle FTP client with PASV/EPSV-aware response forwarding."""
- peername = client_writer.get_extra_info("peername")
- client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info("%s proxy: client connected from %s", self.name, client_id)
- if self.on_connect:
- try:
- self.on_connect(client_id)
- except Exception:
- pass # Ignore connect callback errors; connection proceeds regardless
- # Determine our local IP from the control connection socket
- sockname = client_writer.get_extra_info("sockname")
- local_ip = sockname[0] if sockname else "0.0.0.0" # nosec B104
- if local_ip in ("0.0.0.0", "::"): # nosec B104
- local_ip = "127.0.0.1"
- # Connect to target printer with TLS
- try:
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(
- self.target_host,
- self.target_port,
- ssl=self._client_ssl_context,
- ),
- timeout=10.0,
- )
- logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
- except TimeoutError:
- logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
- client_writer.close()
- await client_writer.wait_closed()
- return
- except ssl.SSLError as e:
- logger.error(
- "%s proxy: SSL error connecting to %s:%s: %s", self.name, self.target_host, self.target_port, e
- )
- client_writer.close()
- await client_writer.wait_closed()
- return
- except OSError as e:
- logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
- client_writer.close()
- await client_writer.wait_closed()
- return
- # Capture the TLS session from the control channel for data channel
- # reuse. vsFTPd (X1C) requires require_ssl_reuse — the data connection
- # must present the same TLS session as the control channel.
- ctrl_ssl_object = printer_writer.get_extra_info("ssl_object")
- ctrl_tls_session = ctrl_ssl_object.session if ctrl_ssl_object else None
- if ctrl_tls_session:
- logger.debug("%s proxy: captured TLS session for data channel reuse", self.name)
- # Track data channel protection level per session.
- # PROT C = cleartext data, PROT P = TLS data.
- # Default to cleartext — many Bambu printers (A1, H2D) use PROT C.
- # If the slicer sends PROT P, we switch to TLS for data connections.
- session_state: dict[str, str | ssl.SSLSession] = {"prot": "C"}
- if ctrl_tls_session:
- session_state["tls_session"] = ctrl_tls_session
- # Client→Printer: intercept EPSV and replace with PASV
- # EPSV responses only contain a port (no IP), so the slicer reuses
- # the control connection IP. If that IP is the real printer (via
- # iptables REDIRECT), the data connection bypasses the proxy.
- # PASV responses include an explicit IP that we can rewrite.
- client_to_printer = asyncio.create_task(
- self._forward_ftp_commands(client_reader, printer_writer, f"{client_id}→printer", session_state),
- name=f"{self.name}_c2p_{client_id}",
- )
- # Printer→Client: intercept PASV/EPSV responses
- printer_to_client = asyncio.create_task(
- self._forward_ftp_control(printer_reader, client_writer, f"printer→{client_id}", local_ip, session_state),
- name=f"{self.name}_p2c_{client_id}",
- )
- self._active_connections[client_id] = (client_to_printer, printer_to_client)
- try:
- done, pending = await asyncio.wait(
- [client_to_printer, printer_to_client],
- return_when=asyncio.FIRST_COMPLETED,
- )
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass # Expected when cancelling the other forwarding direction
- except Exception as e:
- logger.debug("%s proxy connection error: %s", self.name, e)
- finally:
- self._active_connections.pop(client_id, None)
- for writer in [client_writer, printer_writer]:
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort connection cleanup; peer may have disconnected
- logger.info("%s proxy: client %s disconnected", self.name, client_id)
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass # Ignore disconnect callback errors; cleanup continues
- async def _forward_ftp_commands(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- session_state: dict[str, str | ssl.SSLSession],
- ) -> None:
- """Forward FTP client commands, replacing EPSV with PASV.
- EPSV responses only contain a port number — the client reuses the
- control connection IP for data. When the control IP is the real
- printer (due to iptables REDIRECT), EPSV data connections bypass
- the proxy. PASV responses include an explicit IP that the proxy
- can rewrite to its own address.
- Also tracks PROT P/C commands to know whether data connections
- should use TLS or cleartext.
- """
- buffer = b""
- total_bytes = 0
- try:
- while self._running:
- data = await reader.read(65536)
- if not data:
- break
- total_bytes += len(data)
- buffer += data
- output = b""
- while b"\r\n" in buffer:
- idx = buffer.index(b"\r\n")
- line = buffer[:idx]
- buffer = buffer[idx + 2 :]
- cmd_upper = line.strip().upper()
- # Track PROT level for data channel encryption
- if cmd_upper == b"PROT P":
- session_state["prot"] = "P"
- logger.info("FTP data protection: PROT P (TLS)")
- elif cmd_upper == b"PROT C":
- session_state["prot"] = "C"
- logger.info("FTP data protection: PROT C (cleartext)")
- output += line + b"\r\n"
- if output:
- writer.write(output)
- await writer.drain()
- logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
- except asyncio.CancelledError:
- pass # Expected when the other forwarding direction closes first
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- if buffer:
- try:
- writer.write(buffer)
- await writer.drain()
- except OSError:
- pass # Best-effort flush of remaining FTP command data
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- async def _forward_ftp_control(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- local_ip: str,
- session_state: dict[str, str | ssl.SSLSession],
- ) -> None:
- """Forward FTP control channel responses, rewriting PASV/EPSV.
- FTP control channel is line-based (\\r\\n terminated). We buffer data
- and process complete lines, intercepting 227 (PASV) and 229 (EPSV)
- responses to create local data proxies.
- """
- buffer = b""
- total_bytes = 0
- try:
- while self._running:
- data = await reader.read(65536)
- if not data:
- break
- total_bytes += len(data)
- buffer += data
- output = b""
- # Process all complete lines
- while b"\r\n" in buffer:
- idx = buffer.index(b"\r\n")
- line = buffer[:idx]
- buffer = buffer[idx + 2 :]
- rewritten = await self._maybe_rewrite_pasv(line, local_ip, session_state)
- output += rewritten + b"\r\n"
- if output:
- writer.write(output)
- await writer.drain()
- logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
- except asyncio.CancelledError:
- pass # Expected when the other forwarding direction closes first
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- # Flush any remaining buffered data
- if buffer:
- try:
- writer.write(buffer)
- await writer.drain()
- except OSError:
- pass # Best-effort flush of remaining FTP control data
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- async def _maybe_rewrite_pasv(
- self, line: bytes, local_ip: str, session_state: dict[str, str | ssl.SSLSession]
- ) -> bytes:
- """Rewrite PASV/EPSV response to point to a local data proxy."""
- try:
- text = line.decode("utf-8")
- except UnicodeDecodeError:
- return line
- # 227 Entering Passive Mode (h1,h2,h3,h4,p1,p2)
- if text.startswith("227 "):
- match = re.search(r"\(\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*\)", text)
- if match:
- h1, h2, h3, h4, p1, p2 = (int(x) for x in match.groups())
- printer_ip = f"{h1}.{h2}.{h3}.{h4}"
- printer_port = p1 * 256 + p2
- local_port = await self._create_data_proxy(printer_ip, printer_port, session_state)
- if local_port:
- ip_parts = local_ip.split(".")
- lp1 = local_port // 256
- lp2 = local_port % 256
- rewritten = (
- f"227 Entering Passive Mode "
- f"({ip_parts[0]},{ip_parts[1]},{ip_parts[2]},{ip_parts[3]},{lp1},{lp2})"
- )
- logger.info("FTP PASV rewrite: %s:%s → %s:%s", printer_ip, printer_port, local_ip, local_port)
- return rewritten.encode("utf-8")
- else:
- logger.error("FTP PASV: failed to create data proxy for %s:%s", printer_ip, printer_port)
- else:
- logger.warning("FTP PASV: 227 response didn't match expected format: %s", text[:100])
- # 229 Entering Extended Passive Mode (|||port|)
- elif text.startswith("229 "):
- match = re.search(r"\(\|\|\|(\d+)\|\)", text)
- if match:
- printer_port = int(match.group(1))
- local_port = await self._create_data_proxy(self.target_host, printer_port, session_state)
- if local_port:
- rewritten = f"229 Entering Extended Passive Mode (|||{local_port}|)"
- logger.info("FTP EPSV rewrite: port %s → %s", printer_port, local_port)
- return rewritten.encode("utf-8")
- else:
- logger.error("FTP EPSV: failed to create data proxy for port %s", printer_port)
- else:
- logger.warning("FTP EPSV: 229 response didn't match expected format: %s", text[:100])
- return line
- async def _create_data_proxy(
- self, printer_ip: str, printer_port: int, session_state: dict[str, str | ssl.SSLSession]
- ) -> int | None:
- """Create a one-shot proxy for an FTP data connection.
- Prefers the printer's original passive port so the port number stays
- the same in the rewritten PASV/EPSV response. This is critical when
- the slicer's FTP bounce-attack protection overrides the IP in the PASV
- response: the slicer connects to <control_IP>:<port>, and if iptables
- REDIRECT maps that port to the local machine, the data proxy must be
- listening on the *same* port number.
- Falls back to a random port if the original is unavailable.
- Uses TLS or cleartext based on the session's PROT level:
- - PROT P: TLS on both slicer and printer data connections
- - PROT C: cleartext on both sides (common for A1/H2D printers)
- Returns the local port number, or None if binding failed.
- """
- use_tls = session_state.get("prot") == "P"
- logger.info(
- "FTP data proxy: creating data proxy for %s:%s (printer-side %s)",
- printer_ip,
- printer_port,
- "TLS" if use_tls else "cleartext",
- )
- # Get control channel TLS session for data channel reuse
- tls_session = session_state.get("tls_session") if use_tls else None
- # Try the printer's original port first — this ensures the port
- # matches even when bounce protection or iptables REDIRECT is in play.
- try:
- await self._start_data_proxy_server(printer_port, printer_ip, printer_port, use_tls, tls_session)
- logger.info("FTP data proxy: using printer's port %s", printer_port)
- return printer_port
- except OSError as e:
- logger.debug(
- "FTP data proxy: printer port %s unavailable (%s), trying random",
- printer_port,
- e,
- )
- for _attempt in range(10):
- port = random.randint(self.PASV_PORT_MIN, self.PASV_PORT_MAX)
- try:
- await self._start_data_proxy_server(port, printer_ip, printer_port, use_tls, tls_session)
- logger.info("FTP data proxy: using random port %s", port)
- return port
- except OSError:
- continue
- logger.error("Failed to bind FTP data proxy port after 10 attempts")
- return None
- async def _start_data_proxy_server(
- self,
- port: int,
- printer_ip: str,
- printer_port: int,
- use_tls: bool,
- tls_session: ssl.SSLSession | None = None,
- ) -> None:
- """Start a one-shot server for one FTP data connection.
- When the slicer connects, immediately connects to the printer's data
- port and buffers any slicer data until the printer connection is ready.
- This handles zero-byte uploads (verify_job) where the slicer closes
- the data channel before a naive proxy would finish its TLS handshake.
- The slicer-side listener is ALWAYS cleartext. Even when the slicer
- sends PROT P on the control channel, Bambu Studio does not perform
- a TLS handshake on the data connection — it relies on the implicit
- FTPS control channel for authentication and sends data unencrypted.
- The printer-side outbound connection follows the PROT level:
- - PROT P (use_tls=True): TLS to the printer's data port
- - PROT C (use_tls=False): cleartext to the printer's data port
- This mirrors the control channel's TLS-termination architecture.
- Raises OSError if the port is already in use.
- """
- connected = asyncio.Event()
- server_holder: list[asyncio.Server] = []
- # Slicer side: ALWAYS cleartext — Bambu Studio does not do TLS on
- # the data channel even after sending PROT P.
- # Printer side: TLS if PROT P, cleartext if PROT C.
- # For TLS data connections, wrap the SSL context to reuse the
- # control channel's TLS session if available. vsFTPd (X1C) requires
- # require_ssl_reuse — without this, data connections are rejected
- # with "522 SSL connection failed: session reuse required".
- if use_tls and tls_session:
- client_ssl = _SessionReuseSSLContext(self._client_ssl_context, tls_session)
- logger.debug("FTP data proxy: using TLS session reuse for port %s", port)
- else:
- client_ssl = self._client_ssl_context if use_tls else None
- # Slicer side is ALWAYS cleartext — Bambu Studio does not do TLS on
- # the data channel even after PROT P (confirmed for both H2D and X1C).
- printer_mode = "TLS" if use_tls else "cleartext"
- async def handle_data(
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle one FTP data connection, then close the server."""
- peername = client_writer.get_extra_info("peername")
- data_client = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info(
- "FTP data proxy port %s (slicer=cleartext, printer=%s): client connected from %s, bridging to %s:%s",
- port,
- printer_mode,
- data_client,
- printer_ip,
- printer_port,
- )
- connected.set()
- # One-shot: close server after accepting first connection
- if server_holder:
- server_holder[0].close()
- printer_writer = None
- try:
- # Buffer any slicer data while connecting to printer.
- # This handles the race where the slicer sends data (or closes
- # for zero-byte files) before the TLS handshake completes.
- slicer_buffer = bytearray()
- slicer_eof = False
- async def buffer_slicer():
- nonlocal slicer_eof
- while True:
- chunk = await client_reader.read(65536)
- if not chunk:
- slicer_eof = True
- return
- slicer_buffer.extend(chunk)
- buffer_task = asyncio.create_task(buffer_slicer())
- # Connect to printer's data port
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(printer_ip, printer_port, ssl=client_ssl),
- timeout=10.0,
- )
- logger.info(
- "FTP data proxy port %s (printer=%s): connected to printer %s:%s",
- port,
- printer_mode,
- printer_ip,
- printer_port,
- )
- # Stop buffering
- buffer_task.cancel()
- try:
- await buffer_task
- except asyncio.CancelledError:
- pass
- # Flush buffered slicer data to printer
- logger.info(
- "FTP data proxy port %s: buffer=%s bytes, slicer_eof=%s",
- port,
- len(slicer_buffer),
- slicer_eof,
- )
- if slicer_buffer:
- printer_writer.write(bytes(slicer_buffer))
- await printer_writer.drain()
- # Forward remaining slicer data to printer, then close the
- # printer side to signal upload complete.
- #
- # Bambu Studio does NOT close the FTP data channel after sending
- # STOR data — it keeps the connection open and waits for the
- # printer to close its side + send 226 on the control channel.
- # A naive bidirectional proxy deadlocks here because the proxy
- # waits for the slicer EOF that never comes.
- #
- # Fix: read slicer data with an idle timeout. Once data has been
- # received and the slicer goes quiet, close the printer side so
- # the printer can send 226. For RETR (download), the printer
- # sends data and closes — the slicer reads until EOF — so this
- # unidirectional approach works for both directions.
- total_c2p = len(slicer_buffer)
- if not slicer_eof:
- # Read remaining slicer data with idle detection.
- # Must be short — Bambu Studio expects 226 almost instantly
- # after sending data. Too long and the slicer times out.
- idle_timeout = 0.3
- while True:
- try:
- chunk = await asyncio.wait_for(client_reader.read(65536), timeout=idle_timeout)
- except TimeoutError:
- if total_c2p > 0:
- # Slicer sent data then went idle — upload done
- logger.debug(
- "FTP data proxy port %s: slicer idle after %s bytes, closing printer side",
- port,
- total_c2p,
- )
- break
- continue # No data yet, keep waiting
- if not chunk:
- break # Slicer closed
- printer_writer.write(chunk)
- await printer_writer.drain()
- total_c2p += len(chunk)
- logger.debug("FTP proxy data_c2p: total %s bytes", total_c2p)
- # Close printer side to signal upload complete.
- # For TLS, close() sends close_notify which the printer treats
- # as end-of-data. The printer then sends 226 on the control
- # channel. For RETR, this is a no-op since the printer closes
- # first and we'd have exited the loop above via EOF.
- try:
- printer_writer.close()
- await printer_writer.wait_closed()
- except OSError:
- pass
- # Wait for 226 response to propagate through the FTP control
- # channel before closing the slicer's data channel.
- #
- # Without this delay, the data channel FIN arrives at the
- # slicer before the 226 response on the control channel.
- # BambuStudio reacts to the data channel FIN within <1ms
- # by sending QUIT + closing the control channel — before
- # 226 arrives (~2-3ms network RTT). This causes verify_job
- # to be treated as failed and shows the login modal.
- #
- # In a direct connection, the printer sends 226 AND closes
- # the data channel simultaneously, so the slicer gets both
- # at once. The delay here emulates that timing.
- if total_c2p > 0:
- await asyncio.sleep(0.5)
- except Exception as e:
- logger.error("FTP data proxy port %s: error: %s", port, e)
- finally:
- for w in [client_writer, printer_writer]:
- if w:
- try:
- w.close()
- await w.wait_closed()
- except OSError:
- pass # Best-effort data connection cleanup
- logger.info("FTP data proxy port %s: connection closed", port)
- server = await asyncio.start_server(
- handle_data,
- "0.0.0.0", # nosec B104
- port,
- # No TLS on slicer side — Bambu Studio doesn't do TLS on data
- # channel even after PROT P (confirmed by connection hang test).
- )
- server_holder.append(server)
- self._data_servers.append(server)
- # Auto-close after 60s if no connection arrives
- async def auto_close() -> None:
- try:
- await asyncio.wait_for(connected.wait(), timeout=60.0)
- except TimeoutError:
- logger.debug("FTP data proxy on port %s timed out, closing", port)
- try:
- server.close()
- await server.wait_closed()
- except OSError:
- pass # Best-effort timeout cleanup
- finally:
- if server in self._data_servers:
- self._data_servers.remove(server)
- asyncio.create_task(auto_close(), name=f"ftp_data_timeout_{port}")
- logger.debug("FTP data proxy: port %s → %s:%s", port, printer_ip, printer_port)
- class SlicerProxyManager:
- """Manages FTP and MQTT TLS proxies for a single printer target."""
- # Bambu printer ports
- PRINTER_FTP_PORT = 990
- PRINTER_MQTT_PORT = 8883
- PRINTER_FILE_TRANSFER_PORT = 6000
- PRINTER_RTSP_PORT = 322 # X1/H2/P2 series camera (A1/P1 use port 6000)
- PRINTER_BIND_PORTS = [3000, 3002]
- # Local listen ports - must match what Bambu Studio expects
- # Note: Port 990 requires root or CAP_NET_BIND_SERVICE capability
- LOCAL_FTP_PORT = 990
- LOCAL_MQTT_PORT = 8883
- def __init__(
- self,
- target_host: str,
- cert_path: Path,
- key_path: Path,
- on_activity: Callable[[str, str], None] | None = None,
- bind_address: str = "0.0.0.0", # nosec B104
- bind_identity: dict[str, str] | None = None,
- ):
- """Initialize the slicer proxy manager.
- Args:
- target_host: Target printer IP address
- cert_path: Path to server certificate
- key_path: Path to server private key
- on_activity: Optional callback for activity logging (name, message)
- bind_address: IP address to bind proxy listeners to
- bind_identity: Optional dict with keys (serial, model, name, version)
- for the bind/detect response. When provided, the proxy responds
- to detect requests itself instead of forwarding to the printer.
- This ensures the slicer sees the VP identity, not the real printer.
- """
- self.target_host = target_host
- self.cert_path = cert_path
- self.key_path = key_path
- self.on_activity = on_activity
- self.bind_address = bind_address
- self.bind_identity = bind_identity
- self._ftp_proxy: TCPProxy | None = None
- self._mqtt_proxy: TLSProxy | None = None
- self._file_transfer_proxy: TCPProxy | None = None
- self._rtsp_proxy: TCPProxy | None = None
- self._bind_proxies: list[TCPProxy] = []
- self._bind_server = None
- self._probe_servers: list[asyncio.Server] = []
- self._tasks: list[asyncio.Task] = []
- # FTP passive data port range — Bambu printers typically use ports in
- # this range for EPSV/PASV data connections. We pre-listen on all of
- # them so EPSV works transparently without decrypting FTP control.
- FTP_DATA_PORT_MIN = 50000
- FTP_DATA_PORT_MAX = 50100
- async def start(self) -> None:
- """Start proxy services.
- Uses transparent TCP proxying for most protocols (FTP, FileTransfer,
- Camera) — raw bytes are forwarded without TLS termination, so the
- slicer gets the printer's real TLS certificate end-to-end.
- Only MQTT is TLS-terminated because we must decrypt the payload to
- rewrite the printer's real IP with the proxy's bind IP.
- """
- logger.info("Starting slicer proxy to %s (transparent mode)", self.target_host)
- # Detect iptables port redirect for FTP
- ftp_listen_port = self.LOCAL_FTP_PORT
- redirect_target = detect_port_redirect(self.LOCAL_FTP_PORT)
- if redirect_target:
- logger.info(
- "Detected iptables redirect: port %d → %d. FTP proxy will listen on %d.",
- self.LOCAL_FTP_PORT,
- redirect_target,
- redirect_target,
- )
- ftp_listen_port = redirect_target
- # FTP control — raw TCP pass-through (end-to-end TLS with printer)
- self._ftp_proxy = TCPProxy(
- name="FTP",
- listen_port=ftp_listen_port,
- target_host=self.target_host,
- target_port=self.PRINTER_FTP_PORT,
- on_connect=lambda cid: self._log_activity("FTP", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("FTP", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- # FTP data ports — pre-listen on the entire passive port range.
- # Since FTP control is encrypted end-to-end, we can't read EPSV
- # responses to know which port the printer chose. Instead, we
- # listen on every port in the range and forward to the same port
- # on the printer. The slicer connects to bind_ip:PORT (from EPSV)
- # and we transparently relay to printer_ip:PORT.
- self._ftp_data_proxies: list[TCPProxy] = []
- for port in range(self.FTP_DATA_PORT_MIN, self.FTP_DATA_PORT_MAX + 1):
- dp = TCPProxy(
- name=f"FTP-Data-{port}",
- listen_port=port,
- target_host=self.target_host,
- target_port=port,
- bind_address=self.bind_address,
- )
- self._ftp_data_proxies.append(dp)
- # MQTT — TLS-terminating proxy (must decrypt to rewrite IP addresses)
- self._mqtt_proxy = TLSProxy(
- name="MQTT",
- listen_port=self.LOCAL_MQTT_PORT,
- target_host=self.target_host,
- target_port=self.PRINTER_MQTT_PORT,
- server_cert_path=self.cert_path,
- server_key_path=self.key_path,
- on_connect=lambda cid: self._log_activity("MQTT", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("MQTT", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- rewrite_ip=(self.target_host, self.bind_address) if self.bind_address != "0.0.0.0" else None, # nosec B104
- )
- # File transfer — raw TCP pass-through (port 6000)
- self._file_transfer_proxy = TCPProxy(
- name="FileTransfer",
- listen_port=self.PRINTER_FILE_TRANSFER_PORT,
- target_host=self.target_host,
- target_port=self.PRINTER_FILE_TRANSFER_PORT,
- on_connect=lambda cid: self._log_activity("FileTransfer", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("FileTransfer", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- # RTSP camera — raw TCP pass-through (port 322)
- self._rtsp_proxy = TCPProxy(
- name="RTSP",
- listen_port=self.PRINTER_RTSP_PORT,
- target_host=self.target_host,
- target_port=self.PRINTER_RTSP_PORT,
- on_connect=lambda cid: self._log_activity("RTSP", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("RTSP", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- # Bind/auth — respond with VP identity instead of proxying to printer.
- # The detect response contains the printer name, serial, model, and
- # bind status. Proxying it would leak the real printer's identity and
- # cause the slicer to treat it as a different device.
- if self.bind_identity:
- from backend.app.services.virtual_printer.bind_server import BindServer
- self._bind_server = BindServer(
- serial=self.bind_identity["serial"],
- model=self.bind_identity["model"],
- name=self.bind_identity["name"],
- version=self.bind_identity.get("version", "01.00.00.00"),
- bind_address=self.bind_address,
- cert_path=self.cert_path,
- key_path=self.key_path,
- )
- else:
- # Fallback: proxy bind requests to the real printer
- for bind_port in self.PRINTER_BIND_PORTS:
- if bind_port == 3002:
- proxy = TLSProxy(
- name="Bind-TLS",
- listen_port=bind_port,
- target_host=self.target_host,
- target_port=bind_port,
- server_cert_path=self.cert_path,
- server_key_path=self.key_path,
- on_connect=lambda cid: self._log_activity("Bind", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("Bind", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- else:
- proxy = TCPProxy(
- name="Bind",
- listen_port=bind_port,
- target_host=self.target_host,
- target_port=bind_port,
- on_connect=lambda cid: self._log_activity("Bind", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("Bind", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- self._bind_proxies.append(proxy)
- # Start as background tasks
- async def run_with_logging(proxy: TLSProxy | TCPProxy) -> None:
- try:
- await proxy.start()
- except Exception as e:
- logger.error("Slicer proxy %s failed: %s", proxy.name, e)
- self._tasks = [
- asyncio.create_task(
- run_with_logging(self._ftp_proxy),
- name="slicer_proxy_ftp",
- ),
- asyncio.create_task(
- run_with_logging(self._mqtt_proxy),
- name="slicer_proxy_mqtt",
- ),
- asyncio.create_task(
- run_with_logging(self._file_transfer_proxy),
- name="slicer_proxy_file_transfer",
- ),
- asyncio.create_task(
- run_with_logging(self._rtsp_proxy),
- name="slicer_proxy_rtsp",
- ),
- ]
- if self._bind_server:
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._bind_server),
- name="slicer_proxy_bind_server",
- )
- )
- for bp in self._bind_proxies:
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(bp),
- name=f"slicer_proxy_bind_{bp.listen_port}",
- )
- )
- # FTP data port proxies (50000-50100)
- for dp in self._ftp_data_proxies:
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(dp),
- name=f"slicer_proxy_ftp_data_{dp.listen_port}",
- )
- )
- # Diagnostic probe: listen on common un-proxied ports to detect
- # if the slicer tries to reach a service we don't handle.
- if self.bind_address and self.bind_address != "0.0.0.0": # nosec B104
- for probe_port in (21, 80, 443):
- try:
- srv = await asyncio.start_server(
- lambda r, w, p=probe_port: self._probe_handler(r, w, p),
- self.bind_address,
- probe_port,
- )
- self._probe_servers.append(srv)
- except OSError:
- pass # Port in use or no permission — skip
- if self._probe_servers:
- probed = [s.sockets[0].getsockname()[1] for s in self._probe_servers if s.sockets]
- logger.info("Proxy diagnostic: probing un-proxied ports %s on %s", probed, self.bind_address)
- logger.info(
- "Slicer proxy started for %s (transparent TCP + MQTT TLS, %d FTP data ports)",
- self.target_host,
- len(self._ftp_data_proxies),
- )
- # Wait for tasks to complete (they run until cancelled)
- # This keeps the start() coroutine alive so the parent task doesn't complete
- try:
- await asyncio.gather(*self._tasks)
- except asyncio.CancelledError:
- logger.debug("Slicer proxy start cancelled")
- async def stop(self) -> None:
- """Stop all proxies."""
- logger.info("Stopping slicer proxy")
- # Stop proxies
- if self._ftp_proxy:
- await self._ftp_proxy.stop()
- self._ftp_proxy = None
- if self._mqtt_proxy:
- await self._mqtt_proxy.stop()
- self._mqtt_proxy = None
- if self._file_transfer_proxy:
- await self._file_transfer_proxy.stop()
- self._file_transfer_proxy = None
- if self._rtsp_proxy:
- await self._rtsp_proxy.stop()
- self._rtsp_proxy = None
- if self._bind_server:
- await self._bind_server.stop()
- self._bind_server = None
- for bp in self._bind_proxies:
- await bp.stop()
- self._bind_proxies = []
- for dp in self._ftp_data_proxies:
- await dp.stop()
- self._ftp_data_proxies = []
- for srv in self._probe_servers:
- srv.close()
- self._probe_servers = []
- # Cancel tasks
- for task in self._tasks:
- task.cancel()
- if self._tasks:
- try:
- await asyncio.wait_for(
- asyncio.gather(*self._tasks, return_exceptions=True),
- timeout=2.0,
- )
- except TimeoutError:
- logger.debug("Some proxy tasks didn't stop in time")
- self._tasks = []
- logger.info("Slicer proxy stopped")
- async def _probe_handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, port: int) -> None:
- """Log unexpected connections on un-proxied ports for diagnostics."""
- peername = writer.get_extra_info("peername")
- client = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.warning(
- "PROBE: slicer connected to un-proxied port %d from %s — this port may need proxying",
- port,
- client,
- )
- writer.close()
- try:
- await writer.wait_closed()
- except OSError:
- pass
- def _log_activity(self, name: str, message: str) -> None:
- """Log activity via callback if configured."""
- if self.on_activity:
- try:
- self.on_activity(name, message)
- except Exception:
- pass # Ignore activity callback errors; logging is non-critical
- @property
- def is_running(self) -> bool:
- """Check if proxies are running."""
- return len(self._tasks) > 0 and all(not t.done() for t in self._tasks)
- def get_status(self) -> dict:
- """Get proxy status."""
- return {
- "running": self.is_running,
- "target_host": self.target_host,
- "ftp_port": self.LOCAL_FTP_PORT,
- "mqtt_port": self.LOCAL_MQTT_PORT,
- "bind_ports": self.PRINTER_BIND_PORTS,
- "ftp_connections": (len(self._ftp_proxy._active_connections) if self._ftp_proxy else 0),
- "mqtt_connections": (len(self._mqtt_proxy._active_connections) if self._mqtt_proxy else 0),
- "bind_connections": sum(len(bp._active_connections) for bp in self._bind_proxies),
- }
|