| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546 |
- """TLS proxy for slicer-to-printer communication.
- This module provides a TLS terminating proxy that forwards data between
- a slicer and a real Bambu printer, enabling remote printing over
- any network connection.
- Unlike a transparent TCP proxy, this terminates TLS on both ends:
- - Slicer connects to Bambuddy using Bambuddy's certificate
- - Bambuddy connects to printer using printer's certificate
- - Data is decrypted, forwarded, and re-encrypted
- """
- import asyncio
- import logging
- import random
- import re
- import ssl
- import subprocess
- from collections.abc import Callable
- from pathlib import Path
- logger = logging.getLogger(__name__)
- def detect_port_redirect(port: int) -> int | None:
- """Detect if iptables redirects a port to another port.
- When iptables NAT REDIRECT rules exist (e.g. port redirects), connections
- to the original port never reach our socket because iptables intercepts
- them in PREROUTING. We must listen on the redirect target instead.
- Returns the redirect target port, or None if no redirect is active.
- """
- # Method 1: Read persistent rules file (doesn't require root)
- for rules_path in ("/etc/iptables/rules.v4", "/etc/iptables.rules"):
- try:
- with open(rules_path) as f:
- content = f.read()
- match = re.search(rf"--dport {port}\b.*?--to-ports\s+(\d+)", content)
- if match:
- target = int(match.group(1))
- if target != port:
- return target
- except (FileNotFoundError, PermissionError, OSError):
- continue
- # Method 2: Query live iptables rules (may require root)
- try:
- result = subprocess.run( # noqa: S603, S607
- ["iptables-save", "-t", "nat"],
- capture_output=True,
- text=True,
- timeout=5,
- )
- if result.returncode == 0:
- match = re.search(rf"--dport {port}\b.*?--to-ports\s+(\d+)", result.stdout)
- if match:
- target = int(match.group(1))
- if target != port:
- return target
- except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
- pass
- return None
- class TLSProxy:
- """TLS terminating proxy that forwards data between client and target.
- This proxy terminates TLS on both ends, allowing the slicer to connect
- to Bambuddy's certificate while Bambuddy connects to the real printer.
- """
- def __init__(
- self,
- name: str,
- listen_port: int,
- target_host: str,
- target_port: int,
- server_cert_path: Path,
- server_key_path: Path,
- on_connect: Callable[[str], None] | None = None,
- on_disconnect: Callable[[str], None] | None = None,
- bind_address: str = "0.0.0.0", # nosec B104
- rewrite_ip: tuple[str, str] | None = None,
- ):
- """Initialize the TLS proxy.
- Args:
- name: Friendly name for logging (e.g., "FTP", "MQTT")
- listen_port: Port to listen on for incoming connections
- target_host: Target printer IP/hostname
- target_port: Target printer port
- server_cert_path: Path to server certificate (for accepting slicer connections)
- server_key_path: Path to server private key
- on_connect: Optional callback when client connects (receives client_id)
- on_disconnect: Optional callback when client disconnects (receives client_id)
- bind_address: IP address to bind to (default: all interfaces)
- rewrite_ip: Optional (old_ip, new_ip) tuple — replaces occurrences of
- the printer's real IP with the proxy's bind IP in printer→client data.
- This prevents the slicer from discovering the printer's real IP
- in MQTT payloads (ip_addr, rtsp_url, etc.) and bypassing the proxy.
- """
- self.name = name
- self.listen_port = listen_port
- self.target_host = target_host
- self.target_port = target_port
- self.server_cert_path = server_cert_path
- self.server_key_path = server_key_path
- self.on_connect = on_connect
- self.on_disconnect = on_disconnect
- self.bind_address = bind_address
- # IP rewriting for printer→client direction
- if rewrite_ip:
- self._rewrite_old = rewrite_ip[0].encode("utf-8")
- self._rewrite_new = rewrite_ip[1].encode("utf-8")
- # Also rewrite the integer IP in net.info[].ip fields.
- # Bambu printers encode their IP as a little-endian uint32 integer
- # in the JSON payload. BambuStudio reads this to set dev_ip.
- self._rewrite_old_int = self._ip_to_le_int_bytes(rewrite_ip[0])
- self._rewrite_new_int = self._ip_to_le_int_bytes(rewrite_ip[1])
- else:
- self._rewrite_old = None
- self._rewrite_new = None
- self._rewrite_old_int = None
- self._rewrite_new_int = None
- self._server: asyncio.Server | None = None
- self._running = False
- self._active_connections: dict[str, tuple[asyncio.Task, asyncio.Task]] = {}
- self._server_ssl_context: ssl.SSLContext | None = None
- self._client_ssl_context: ssl.SSLContext | None = None
- @staticmethod
- def _ip_to_le_int_bytes(ip: str) -> bytes:
- """Convert an IP address to its little-endian integer JSON representation.
- E.g. "192.168.255.16" → b"285190336" (the integer as a decimal string,
- as it appears in Bambu MQTT JSON payloads in the net.info[].ip field).
- """
- import struct as _struct
- parts = ip.split(".")
- packed = bytes(int(p) for p in parts)
- le_int = _struct.unpack("<I", packed)[0]
- return str(le_int).encode("utf-8")
- def _create_server_ssl_context(self) -> ssl.SSLContext:
- """Create SSL context for accepting client (slicer) connections."""
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- ctx.load_cert_chain(self.server_cert_path, self.server_key_path)
- # Allow older TLS versions for compatibility with slicers
- ctx.minimum_version = ssl.TLSVersion.TLSv1_2
- # Don't require client certificates
- ctx.verify_mode = ssl.CERT_NONE
- return ctx
- def _create_client_ssl_context(self) -> ssl.SSLContext:
- """Create SSL context for connecting to printer."""
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- # Don't verify printer's certificate (self-signed)
- ctx.check_hostname = False
- ctx.verify_mode = ssl.CERT_NONE
- ctx.minimum_version = ssl.TLSVersion.TLSv1_2
- # Bambu printers use plain RSA key exchange (no ECDHE/DHE),
- # which modern OpenSSL 3.x defaults exclude. Add them back.
- ctx.set_ciphers("DEFAULT:AES256-GCM-SHA384:AES128-GCM-SHA256")
- return ctx
- async def start(self) -> None:
- """Start the TLS proxy server."""
- if self._running:
- return
- logger.info(
- f"Starting {self.name} TLS proxy: {self.bind_address}:{self.listen_port} → {self.target_host}:{self.target_port}"
- )
- try:
- self._running = True
- # Create SSL contexts
- self._server_ssl_context = self._create_server_ssl_context()
- self._client_ssl_context = self._create_client_ssl_context()
- # Start server with TLS
- self._server = await asyncio.start_server(
- self._handle_client,
- self.bind_address,
- self.listen_port,
- ssl=self._server_ssl_context,
- )
- logger.info("%s TLS proxy listening on port %s", self.name, self.listen_port)
- async with self._server:
- await self._server.serve_forever()
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("%s proxy port %s is already in use", self.name, self.listen_port)
- elif e.errno == 13: # Permission denied
- logger.error(
- "%s proxy: cannot bind to port %s (permission denied). "
- "Port %s requires root or CAP_NET_BIND_SERVICE. "
- "Docker: add 'cap_add: [NET_BIND_SERVICE]' to docker-compose.yml. "
- "Native: use 'sudo setcap cap_net_bind_service=+ep $(which python3)' "
- "or redirect with iptables.",
- self.name,
- self.listen_port,
- self.listen_port,
- )
- else:
- logger.error("%s proxy error: %s", self.name, e)
- except asyncio.CancelledError:
- logger.debug("%s proxy task cancelled", self.name)
- except Exception as e:
- logger.error("%s proxy error: %s", self.name, e)
- finally:
- await self.stop()
- async def stop(self) -> None:
- """Stop the TLS proxy server."""
- logger.info("Stopping %s proxy", self.name)
- self._running = False
- # Cancel all active connection tasks
- for client_id, (task1, task2) in list(self._active_connections.items()):
- task1.cancel()
- task2.cancel()
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass # Ignore disconnect callback errors during shutdown
- self._active_connections.clear()
- if self._server:
- try:
- self._server.close()
- await self._server.wait_closed()
- except OSError as e:
- logger.debug("Error closing %s proxy server: %s", self.name, e)
- self._server = None
- async def _handle_client(
- self,
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle a new client connection by proxying to target."""
- peername = client_writer.get_extra_info("peername")
- client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info("%s proxy: client connected from %s", self.name, client_id)
- if self.on_connect:
- try:
- self.on_connect(client_id)
- except Exception:
- pass # Ignore connect callback errors; connection proceeds regardless
- # Connect to target printer with TLS
- try:
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(
- self.target_host,
- self.target_port,
- ssl=self._client_ssl_context,
- ),
- timeout=10.0,
- )
- logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
- except TimeoutError:
- logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
- client_writer.close()
- await client_writer.wait_closed()
- return
- except ssl.SSLError as e:
- logger.error(
- "%s proxy: SSL error connecting to %s:%s: %s", self.name, self.target_host, self.target_port, e
- )
- client_writer.close()
- await client_writer.wait_closed()
- return
- except OSError as e:
- logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
- client_writer.close()
- await client_writer.wait_closed()
- return
- # Create bidirectional forwarding tasks
- client_to_printer = asyncio.create_task(
- self._forward(client_reader, printer_writer, f"{client_id}→printer"),
- name=f"{self.name}_c2p_{client_id}",
- )
- printer_to_client = asyncio.create_task(
- self._forward(printer_reader, client_writer, f"printer→{client_id}", rewrite_ip=True),
- name=f"{self.name}_p2c_{client_id}",
- )
- self._active_connections[client_id] = (client_to_printer, printer_to_client)
- try:
- # Wait for either direction to complete (connection closed)
- done, pending = await asyncio.wait(
- [client_to_printer, printer_to_client],
- return_when=asyncio.FIRST_COMPLETED,
- )
- # Cancel the other direction
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass # Expected when cancelling the other forwarding direction
- except Exception as e:
- logger.debug("%s proxy connection error: %s", self.name, e)
- finally:
- # Clean up
- self._active_connections.pop(client_id, None)
- for writer in [client_writer, printer_writer]:
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort connection cleanup; peer may have disconnected
- logger.info("%s proxy: client %s disconnected", self.name, client_id)
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass # Ignore disconnect callback errors; cleanup continues
- @staticmethod
- def _rewrite_mqtt_ip(
- data: bytes,
- old_ip: bytes,
- new_ip: bytes,
- buffer: bytearray,
- extra_replacements: list[tuple[bytes, bytes]] | None = None,
- ) -> tuple[bytes, bytearray]:
- """Rewrite IP addresses inside MQTT packets, preserving packet framing.
- MQTT packets have a variable-length header encoding the remaining
- packet length. A naive bytes.replace() would corrupt this framing
- when old_ip and new_ip differ in length.
- This method parses individual MQTT packets out of the data stream,
- performs the replacement only on PUBLISH payloads, and re-encodes
- the remaining-length field to match the new size.
- Incomplete packets are buffered and returned for the next call.
- Args:
- extra_replacements: Additional (old, new) byte pairs to replace
- (e.g. the integer IP representation in net.info[].ip).
- Returns (output_data, remaining_buffer).
- """
- buffer.extend(data)
- # Check if any replacement target exists in the buffer
- has_target = old_ip in buffer
- if not has_target and extra_replacements:
- has_target = any(old in buffer for old, _new in extra_replacements)
- if not has_target:
- # Fast path: no IP in buffer, but we still need to check for
- # incomplete packets at the end that might contain a partial IP.
- # For safety, try to parse and emit only complete packets.
- result = bytearray()
- pos = 0
- length = len(buffer)
- while pos < length:
- packet_start = pos
- if pos + 1 >= length:
- break
- pos += 1 # header byte
- # Parse remaining length
- remaining_length = 0
- multiplier = 1
- length_bytes = 0
- while pos < length:
- encoded_byte = buffer[pos]
- pos += 1
- remaining_length += (encoded_byte & 0x7F) * multiplier
- multiplier *= 128
- length_bytes += 1
- if (encoded_byte & 0x80) == 0:
- break
- if length_bytes >= 4:
- break
- if pos + remaining_length > length:
- # Incomplete — keep in buffer
- new_buffer = bytearray(buffer[packet_start:])
- return bytes(result), new_buffer
- pos += remaining_length
- result.extend(buffer[packet_start:pos])
- # All complete
- buffer.clear()
- return bytes(result) if result else bytes(data), buffer
- # Buffer contains old_ip — parse packets and rewrite
- result = bytearray()
- pos = 0
- length = len(buffer)
- while pos < length:
- packet_start = pos
- if pos >= length:
- break
- header_byte = buffer[pos]
- pos += 1
- # Remaining length: variable-length encoding (1-4 bytes)
- remaining_length = 0
- multiplier = 1
- length_bytes = 0
- while pos < length:
- encoded_byte = buffer[pos]
- pos += 1
- remaining_length += (encoded_byte & 0x7F) * multiplier
- multiplier *= 128
- length_bytes += 1
- if (encoded_byte & 0x80) == 0:
- break
- if length_bytes >= 4:
- break
- # Check if we have enough data for the full packet
- if pos + remaining_length > length:
- # Incomplete packet — keep in buffer for next call
- new_buffer = bytearray(buffer[packet_start:])
- return bytes(result), new_buffer
- packet_type = (header_byte >> 4) & 0x0F
- packet_body = buffer[pos : pos + remaining_length]
- pos += remaining_length
- # Only rewrite PUBLISH packets (type 3)
- needs_rewrite = packet_type == 3 and (
- old_ip in packet_body
- or (extra_replacements and any(old in packet_body for old, _new in extra_replacements))
- )
- if needs_rewrite:
- new_body = bytes(packet_body).replace(old_ip, new_ip)
- if extra_replacements:
- for old_val, new_val in extra_replacements:
- new_body = new_body.replace(old_val, new_val)
- # Re-encode: header byte + new remaining length + new body
- result.append(header_byte)
- # Encode remaining length (MQTT variable-length encoding)
- new_remaining = len(new_body)
- while True:
- encoded_byte = new_remaining % 128
- new_remaining //= 128
- if new_remaining > 0:
- encoded_byte |= 0x80
- result.append(encoded_byte)
- if new_remaining == 0:
- break
- result.extend(new_body)
- else:
- # Pass through unchanged
- result.extend(buffer[packet_start:pos])
- buffer.clear()
- return bytes(result), buffer
- async def _forward(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- rewrite_ip: bool = False,
- ) -> None:
- """Forward data from reader to writer.
- Args:
- reader: Source stream (already TLS-decrypted)
- writer: Destination stream (will be TLS-encrypted by the stream)
- direction: Description for logging (e.g., "client→printer")
- rewrite_ip: If True and rewrite_ip was configured, replace the
- printer's real IP with the proxy's bind IP in the data.
- """
- do_rewrite = rewrite_ip and self._rewrite_old is not None
- rewrite_buffer = bytearray() if do_rewrite else None
- rewrite_logged = False
- total_bytes = 0
- try:
- while self._running:
- # Read chunk - use reasonable buffer size
- data = await reader.read(65536)
- if not data:
- # Connection closed
- break
- # Rewrite printer IP → proxy IP in MQTT PUBLISH payloads
- # to prevent the slicer from bypassing the proxy.
- if do_rewrite:
- original_len = len(data)
- extra = [(self._rewrite_old_int, self._rewrite_new_int)] if self._rewrite_old_int else None
- data, rewrite_buffer = self._rewrite_mqtt_ip(
- data,
- self._rewrite_old,
- self._rewrite_new,
- rewrite_buffer,
- extra_replacements=extra,
- )
- if not rewrite_logged and data and len(data) != original_len:
- logger.info(
- "%s proxy IP rewrite active: %s → %s",
- self.name,
- self._rewrite_old.decode(),
- self._rewrite_new.decode(),
- )
- rewrite_logged = True
- if not data:
- continue # All data buffered, waiting for more
- # Forward to destination
- writer.write(data)
- await writer.drain()
- total_bytes += len(data)
- logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
- except asyncio.CancelledError:
- pass # Expected when the other forwarding direction closes first
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- class TCPProxy:
- """Raw TCP proxy that forwards data without TLS termination.
- Used for protocols where the printer doesn't use TLS (e.g., port 3000
- binding/authentication protocol).
- """
- def __init__(
- self,
- name: str,
- listen_port: int,
- target_host: str,
- target_port: int,
- on_connect: Callable[[str], None] | None = None,
- on_disconnect: Callable[[str], None] | None = None,
- bind_address: str = "0.0.0.0", # nosec B104
- ):
- self.name = name
- self.listen_port = listen_port
- self.target_host = target_host
- self.target_port = target_port
- self.on_connect = on_connect
- self.on_disconnect = on_disconnect
- self.bind_address = bind_address
- self._server: asyncio.Server | None = None
- self._running = False
- self._active_connections: dict[str, tuple[asyncio.Task, asyncio.Task]] = {}
- async def start(self) -> None:
- """Start the TCP proxy server."""
- if self._running:
- return
- logger.info(
- "Starting %s TCP proxy: %s:%s → %s:%s",
- self.name,
- self.bind_address,
- self.listen_port,
- self.target_host,
- self.target_port,
- )
- try:
- self._running = True
- self._server = await asyncio.start_server(
- self._handle_client,
- self.bind_address,
- self.listen_port,
- )
- logger.info("%s TCP proxy listening on port %s", self.name, self.listen_port)
- async with self._server:
- await self._server.serve_forever()
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("%s proxy port %s is already in use", self.name, self.listen_port)
- else:
- logger.error("%s proxy error: %s", self.name, e)
- except asyncio.CancelledError:
- logger.debug("%s proxy task cancelled", self.name)
- except Exception as e:
- logger.error("%s proxy error: %s", self.name, e)
- finally:
- await self.stop()
- async def stop(self) -> None:
- """Stop the TCP proxy server."""
- logger.info("Stopping %s proxy", self.name)
- self._running = False
- for client_id, (task1, task2) in list(self._active_connections.items()):
- task1.cancel()
- task2.cancel()
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass
- self._active_connections.clear()
- if self._server:
- try:
- self._server.close()
- await self._server.wait_closed()
- except OSError as e:
- logger.debug("Error closing %s proxy server: %s", self.name, e)
- self._server = None
- async def _handle_client(
- self,
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle a new client connection by proxying to target."""
- peername = client_writer.get_extra_info("peername")
- client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info("%s proxy: client connected from %s", self.name, client_id)
- if self.on_connect:
- try:
- self.on_connect(client_id)
- except Exception:
- pass
- try:
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(self.target_host, self.target_port),
- timeout=10.0,
- )
- logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
- except TimeoutError:
- logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
- client_writer.close()
- await client_writer.wait_closed()
- return
- except OSError as e:
- logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
- client_writer.close()
- await client_writer.wait_closed()
- return
- client_to_printer = asyncio.create_task(
- self._forward(client_reader, printer_writer, f"{client_id}→printer"),
- name=f"{self.name}_c2p_{client_id}",
- )
- printer_to_client = asyncio.create_task(
- self._forward(printer_reader, client_writer, f"printer→{client_id}"),
- name=f"{self.name}_p2c_{client_id}",
- )
- self._active_connections[client_id] = (client_to_printer, printer_to_client)
- try:
- done, pending = await asyncio.wait(
- [client_to_printer, printer_to_client],
- return_when=asyncio.FIRST_COMPLETED,
- )
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
- except Exception as e:
- logger.debug("%s proxy connection error: %s", self.name, e)
- finally:
- self._active_connections.pop(client_id, None)
- for writer in [client_writer, printer_writer]:
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass
- logger.info("%s proxy: client %s disconnected", self.name, client_id)
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass
- async def _forward(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- ) -> None:
- """Forward data from reader to writer."""
- total_bytes = 0
- try:
- while self._running:
- data = await reader.read(65536)
- if not data:
- break
- writer.write(data)
- await writer.drain()
- total_bytes += len(data)
- logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
- except asyncio.CancelledError:
- pass
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- class FTPTLSProxy(TLSProxy):
- """FTP-aware TLS proxy that handles passive data connections.
- Extends TLSProxy to intercept PASV/EPSV responses on the FTP control
- channel, dynamically create TLS data proxies on local ports, and rewrite
- the responses so the slicer connects to the proxy instead of the printer.
- Without this, FTP passive data connections bypass the proxy and go directly
- to the printer, which fails when the slicer can't reach the printer's IP.
- """
- PASV_PORT_MIN = 50000
- PASV_PORT_MAX = 50100
- async def stop(self) -> None:
- """Stop proxy and clean up data connection servers."""
- # Close all data servers first
- for server in list(self._data_servers):
- try:
- server.close()
- await server.wait_closed()
- except OSError:
- pass # Best-effort cleanup of data proxy servers
- self._data_servers.clear()
- await super().stop()
- async def start(self) -> None:
- """Start the FTP TLS proxy."""
- self._data_servers: list[asyncio.Server] = []
- await super().start()
- async def _handle_client(
- self,
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle FTP client with PASV/EPSV-aware response forwarding."""
- peername = client_writer.get_extra_info("peername")
- client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info("%s proxy: client connected from %s", self.name, client_id)
- if self.on_connect:
- try:
- self.on_connect(client_id)
- except Exception:
- pass # Ignore connect callback errors; connection proceeds regardless
- # Determine our local IP from the control connection socket
- sockname = client_writer.get_extra_info("sockname")
- local_ip = sockname[0] if sockname else "0.0.0.0" # nosec B104
- if local_ip in ("0.0.0.0", "::"): # nosec B104
- local_ip = "127.0.0.1"
- # Connect to target printer with TLS
- try:
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(
- self.target_host,
- self.target_port,
- ssl=self._client_ssl_context,
- ),
- timeout=10.0,
- )
- logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
- except TimeoutError:
- logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
- client_writer.close()
- await client_writer.wait_closed()
- return
- except ssl.SSLError as e:
- logger.error(
- "%s proxy: SSL error connecting to %s:%s: %s", self.name, self.target_host, self.target_port, e
- )
- client_writer.close()
- await client_writer.wait_closed()
- return
- except OSError as e:
- logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
- client_writer.close()
- await client_writer.wait_closed()
- return
- # Track data channel protection level per session.
- # PROT C = cleartext data, PROT P = TLS data.
- # Default to cleartext — many Bambu printers (A1, H2D) use PROT C.
- # If the slicer sends PROT P, we switch to TLS for data connections.
- session_state: dict[str, str] = {"prot": "C"}
- # Client→Printer: intercept EPSV and replace with PASV
- # EPSV responses only contain a port (no IP), so the slicer reuses
- # the control connection IP. If that IP is the real printer (via
- # iptables REDIRECT), the data connection bypasses the proxy.
- # PASV responses include an explicit IP that we can rewrite.
- client_to_printer = asyncio.create_task(
- self._forward_ftp_commands(client_reader, printer_writer, f"{client_id}→printer", session_state),
- name=f"{self.name}_c2p_{client_id}",
- )
- # Printer→Client: intercept PASV/EPSV responses
- printer_to_client = asyncio.create_task(
- self._forward_ftp_control(printer_reader, client_writer, f"printer→{client_id}", local_ip, session_state),
- name=f"{self.name}_p2c_{client_id}",
- )
- self._active_connections[client_id] = (client_to_printer, printer_to_client)
- try:
- done, pending = await asyncio.wait(
- [client_to_printer, printer_to_client],
- return_when=asyncio.FIRST_COMPLETED,
- )
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass # Expected when cancelling the other forwarding direction
- except Exception as e:
- logger.debug("%s proxy connection error: %s", self.name, e)
- finally:
- self._active_connections.pop(client_id, None)
- for writer in [client_writer, printer_writer]:
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort connection cleanup; peer may have disconnected
- logger.info("%s proxy: client %s disconnected", self.name, client_id)
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass # Ignore disconnect callback errors; cleanup continues
- async def _forward_ftp_commands(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- session_state: dict[str, str],
- ) -> None:
- """Forward FTP client commands, replacing EPSV with PASV.
- EPSV responses only contain a port number — the client reuses the
- control connection IP for data. When the control IP is the real
- printer (due to iptables REDIRECT), EPSV data connections bypass
- the proxy. PASV responses include an explicit IP that the proxy
- can rewrite to its own address.
- Also tracks PROT P/C commands to know whether data connections
- should use TLS or cleartext.
- """
- buffer = b""
- total_bytes = 0
- try:
- while self._running:
- data = await reader.read(65536)
- if not data:
- break
- total_bytes += len(data)
- buffer += data
- output = b""
- while b"\r\n" in buffer:
- idx = buffer.index(b"\r\n")
- line = buffer[:idx]
- buffer = buffer[idx + 2 :]
- cmd_upper = line.strip().upper()
- # Replace EPSV with PASV so response includes an IP
- if cmd_upper == b"EPSV":
- line = b"PASV"
- logger.info("FTP command rewrite: EPSV → PASV")
- # Track PROT level for data channel encryption
- elif cmd_upper == b"PROT P":
- session_state["prot"] = "P"
- logger.info("FTP data protection: PROT P (TLS)")
- elif cmd_upper == b"PROT C":
- session_state["prot"] = "C"
- logger.info("FTP data protection: PROT C (cleartext)")
- output += line + b"\r\n"
- if output:
- writer.write(output)
- await writer.drain()
- logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
- except asyncio.CancelledError:
- pass # Expected when the other forwarding direction closes first
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- if buffer:
- try:
- writer.write(buffer)
- await writer.drain()
- except OSError:
- pass # Best-effort flush of remaining FTP command data
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- async def _forward_ftp_control(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- local_ip: str,
- session_state: dict[str, str],
- ) -> None:
- """Forward FTP control channel responses, rewriting PASV/EPSV.
- FTP control channel is line-based (\\r\\n terminated). We buffer data
- and process complete lines, intercepting 227 (PASV) and 229 (EPSV)
- responses to create local data proxies.
- """
- buffer = b""
- total_bytes = 0
- try:
- while self._running:
- data = await reader.read(65536)
- if not data:
- break
- total_bytes += len(data)
- buffer += data
- output = b""
- # Process all complete lines
- while b"\r\n" in buffer:
- idx = buffer.index(b"\r\n")
- line = buffer[:idx]
- buffer = buffer[idx + 2 :]
- rewritten = await self._maybe_rewrite_pasv(line, local_ip, session_state)
- output += rewritten + b"\r\n"
- if output:
- writer.write(output)
- await writer.drain()
- logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
- except asyncio.CancelledError:
- pass # Expected when the other forwarding direction closes first
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- # Flush any remaining buffered data
- if buffer:
- try:
- writer.write(buffer)
- await writer.drain()
- except OSError:
- pass # Best-effort flush of remaining FTP control data
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- async def _maybe_rewrite_pasv(self, line: bytes, local_ip: str, session_state: dict[str, str]) -> bytes:
- """Rewrite PASV/EPSV response to point to a local data proxy."""
- try:
- text = line.decode("utf-8")
- except UnicodeDecodeError:
- return line
- # 227 Entering Passive Mode (h1,h2,h3,h4,p1,p2)
- if text.startswith("227 "):
- match = re.search(r"\(\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*\)", text)
- if match:
- h1, h2, h3, h4, p1, p2 = (int(x) for x in match.groups())
- printer_ip = f"{h1}.{h2}.{h3}.{h4}"
- printer_port = p1 * 256 + p2
- local_port = await self._create_data_proxy(printer_ip, printer_port, session_state)
- if local_port:
- ip_parts = local_ip.split(".")
- lp1 = local_port // 256
- lp2 = local_port % 256
- rewritten = (
- f"227 Entering Passive Mode "
- f"({ip_parts[0]},{ip_parts[1]},{ip_parts[2]},{ip_parts[3]},{lp1},{lp2})"
- )
- logger.info("FTP PASV rewrite: %s:%s → %s:%s", printer_ip, printer_port, local_ip, local_port)
- return rewritten.encode("utf-8")
- else:
- logger.error("FTP PASV: failed to create data proxy for %s:%s", printer_ip, printer_port)
- else:
- logger.warning("FTP PASV: 227 response didn't match expected format: %s", text[:100])
- # 229 Entering Extended Passive Mode (|||port|)
- elif text.startswith("229 "):
- match = re.search(r"\(\|\|\|(\d+)\|\)", text)
- if match:
- printer_port = int(match.group(1))
- local_port = await self._create_data_proxy(self.target_host, printer_port, session_state)
- if local_port:
- rewritten = f"229 Entering Extended Passive Mode (|||{local_port}|)"
- logger.info("FTP EPSV rewrite: port %s → %s", printer_port, local_port)
- return rewritten.encode("utf-8")
- else:
- logger.error("FTP EPSV: failed to create data proxy for port %s", printer_port)
- else:
- logger.warning("FTP EPSV: 229 response didn't match expected format: %s", text[:100])
- return line
- async def _create_data_proxy(self, printer_ip: str, printer_port: int, session_state: dict[str, str]) -> int | None:
- """Create a one-shot proxy for an FTP data connection.
- Prefers the printer's original passive port so the port number stays
- the same in the rewritten PASV/EPSV response. This is critical when
- the slicer's FTP bounce-attack protection overrides the IP in the PASV
- response: the slicer connects to <control_IP>:<port>, and if iptables
- REDIRECT maps that port to the local machine, the data proxy must be
- listening on the *same* port number.
- Falls back to a random port if the original is unavailable.
- Uses TLS or cleartext based on the session's PROT level:
- - PROT P: TLS on both slicer and printer data connections
- - PROT C: cleartext on both sides (common for A1/H2D printers)
- Returns the local port number, or None if binding failed.
- """
- use_tls = session_state.get("prot") == "P"
- logger.info(
- "FTP data proxy: creating data proxy for %s:%s (printer-side %s)",
- printer_ip,
- printer_port,
- "TLS" if use_tls else "cleartext",
- )
- # Try the printer's original port first — this ensures the port
- # matches even when bounce protection or iptables REDIRECT is in play.
- try:
- await self._start_data_proxy_server(printer_port, printer_ip, printer_port, use_tls)
- logger.info("FTP data proxy: using printer's port %s", printer_port)
- return printer_port
- except OSError as e:
- logger.debug(
- "FTP data proxy: printer port %s unavailable (%s), trying random",
- printer_port,
- e,
- )
- for _attempt in range(10):
- port = random.randint(self.PASV_PORT_MIN, self.PASV_PORT_MAX)
- try:
- await self._start_data_proxy_server(port, printer_ip, printer_port, use_tls)
- logger.info("FTP data proxy: using random port %s", port)
- return port
- except OSError:
- continue
- logger.error("Failed to bind FTP data proxy port after 10 attempts")
- return None
- async def _start_data_proxy_server(self, port: int, printer_ip: str, printer_port: int, use_tls: bool) -> None:
- """Start a one-shot server for one FTP data connection.
- When the slicer connects, immediately connects to the printer's data
- port and buffers any slicer data until the printer connection is ready.
- This handles zero-byte uploads (verify_job) where the slicer closes
- the data channel before a naive proxy would finish its TLS handshake.
- The slicer-side listener is ALWAYS cleartext. Even when the slicer
- sends PROT P on the control channel, Bambu Studio does not perform
- a TLS handshake on the data connection — it relies on the implicit
- FTPS control channel for authentication and sends data unencrypted.
- The printer-side outbound connection follows the PROT level:
- - PROT P (use_tls=True): TLS to the printer's data port
- - PROT C (use_tls=False): cleartext to the printer's data port
- This mirrors the control channel's TLS-termination architecture.
- Raises OSError if the port is already in use.
- """
- connected = asyncio.Event()
- server_holder: list[asyncio.Server] = []
- # Slicer side: ALWAYS cleartext — Bambu Studio does not do TLS on
- # the data channel even after sending PROT P.
- # Printer side: TLS if PROT P, cleartext if PROT C.
- client_ssl = self._client_ssl_context if use_tls else None
- printer_mode = "TLS" if use_tls else "cleartext"
- async def handle_data(
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle one FTP data connection, then close the server."""
- peername = client_writer.get_extra_info("peername")
- data_client = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info(
- "FTP data proxy port %s (slicer=cleartext, printer=%s): client connected from %s, bridging to %s:%s",
- port,
- printer_mode,
- data_client,
- printer_ip,
- printer_port,
- )
- connected.set()
- # One-shot: close server after accepting first connection
- if server_holder:
- server_holder[0].close()
- printer_writer = None
- try:
- # Buffer any slicer data while connecting to printer.
- # This handles the race where the slicer sends data (or closes
- # for zero-byte files) before the TLS handshake completes.
- slicer_buffer = bytearray()
- slicer_eof = False
- async def buffer_slicer():
- nonlocal slicer_eof
- while True:
- chunk = await client_reader.read(65536)
- if not chunk:
- slicer_eof = True
- return
- slicer_buffer.extend(chunk)
- buffer_task = asyncio.create_task(buffer_slicer())
- # Connect to printer's data port
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(printer_ip, printer_port, ssl=client_ssl),
- timeout=10.0,
- )
- logger.info(
- "FTP data proxy port %s (printer=%s): connected to printer %s:%s",
- port,
- printer_mode,
- printer_ip,
- printer_port,
- )
- # Stop buffering
- buffer_task.cancel()
- try:
- await buffer_task
- except asyncio.CancelledError:
- pass
- # Flush buffered slicer data to printer
- if slicer_buffer:
- printer_writer.write(bytes(slicer_buffer))
- await printer_writer.drain()
- if slicer_eof:
- # Slicer already closed (zero-byte upload like verify_job).
- # Close the printer write side to signal upload complete.
- if printer_writer.can_write_eof():
- printer_writer.write_eof()
- else:
- # Continue bidirectional forwarding
- c2p = asyncio.create_task(self._forward(client_reader, printer_writer, "data_c2p"))
- p2c = asyncio.create_task(self._forward(printer_reader, client_writer, "data_p2c"))
- done, pending = await asyncio.wait([c2p, p2c], return_when=asyncio.FIRST_COMPLETED)
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass # Expected when other data direction closes
- except Exception as e:
- logger.error("FTP data proxy port %s: error: %s", port, e)
- finally:
- for w in [client_writer, printer_writer]:
- if w:
- try:
- w.close()
- await w.wait_closed()
- except OSError:
- pass # Best-effort data connection cleanup
- logger.info("FTP data proxy port %s: connection closed", port)
- server = await asyncio.start_server(
- handle_data,
- "0.0.0.0", # nosec B104
- port,
- # No TLS on slicer side — Bambu Studio doesn't do TLS on data
- # channel even after PROT P. The proxy terminates TLS only on
- # the printer side (inside handle_data).
- )
- server_holder.append(server)
- self._data_servers.append(server)
- # Auto-close after 60s if no connection arrives
- async def auto_close() -> None:
- try:
- await asyncio.wait_for(connected.wait(), timeout=60.0)
- except TimeoutError:
- logger.debug("FTP data proxy on port %s timed out, closing", port)
- try:
- server.close()
- await server.wait_closed()
- except OSError:
- pass # Best-effort timeout cleanup
- finally:
- if server in self._data_servers:
- self._data_servers.remove(server)
- asyncio.create_task(auto_close(), name=f"ftp_data_timeout_{port}")
- logger.debug("FTP data proxy: port %s → %s:%s", port, printer_ip, printer_port)
- class SlicerProxyManager:
- """Manages FTP and MQTT TLS proxies for a single printer target."""
- # Bambu printer ports
- PRINTER_FTP_PORT = 990
- PRINTER_MQTT_PORT = 8883
- PRINTER_FILE_TRANSFER_PORT = 6000
- PRINTER_BIND_PORTS = [3000, 3002]
- # Local listen ports - must match what Bambu Studio expects
- # Note: Port 990 requires root or CAP_NET_BIND_SERVICE capability
- LOCAL_FTP_PORT = 990
- LOCAL_MQTT_PORT = 8883
- def __init__(
- self,
- target_host: str,
- cert_path: Path,
- key_path: Path,
- on_activity: Callable[[str, str], None] | None = None,
- bind_address: str = "0.0.0.0", # nosec B104
- bind_identity: dict[str, str] | None = None,
- ):
- """Initialize the slicer proxy manager.
- Args:
- target_host: Target printer IP address
- cert_path: Path to server certificate
- key_path: Path to server private key
- on_activity: Optional callback for activity logging (name, message)
- bind_address: IP address to bind proxy listeners to
- bind_identity: Optional dict with keys (serial, model, name, version)
- for the bind/detect response. When provided, the proxy responds
- to detect requests itself instead of forwarding to the printer.
- This ensures the slicer sees the VP identity, not the real printer.
- """
- self.target_host = target_host
- self.cert_path = cert_path
- self.key_path = key_path
- self.on_activity = on_activity
- self.bind_address = bind_address
- self.bind_identity = bind_identity
- self._ftp_proxy: TLSProxy | None = None
- self._mqtt_proxy: TLSProxy | None = None
- self._file_transfer_proxy: TLSProxy | None = None
- self._bind_proxies: list[TCPProxy] = []
- self._bind_server = None
- self._tasks: list[asyncio.Task] = []
- async def start(self) -> None:
- """Start FTP and MQTT TLS proxies."""
- logger.info("Starting slicer TLS proxy to %s", self.target_host)
- # Detect iptables port redirect (e.g. if an external redirect exists).
- # If active, connections get intercepted by iptables PREROUTING
- # and sent to the redirect target — our socket never sees them.
- ftp_listen_port = self.LOCAL_FTP_PORT
- redirect_target = detect_port_redirect(self.LOCAL_FTP_PORT)
- if redirect_target:
- logger.info(
- "Detected iptables redirect: port %d → %d. FTP proxy will listen on %d.",
- self.LOCAL_FTP_PORT,
- redirect_target,
- redirect_target,
- )
- ftp_listen_port = redirect_target
- # Create FTP proxy with PASV/EPSV awareness for data connections
- self._ftp_proxy = FTPTLSProxy(
- name="FTP",
- listen_port=ftp_listen_port,
- target_host=self.target_host,
- target_port=self.PRINTER_FTP_PORT,
- server_cert_path=self.cert_path,
- server_key_path=self.key_path,
- on_connect=lambda cid: self._log_activity("FTP", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("FTP", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- self._mqtt_proxy = TLSProxy(
- name="MQTT",
- listen_port=self.LOCAL_MQTT_PORT,
- target_host=self.target_host,
- target_port=self.PRINTER_MQTT_PORT,
- server_cert_path=self.cert_path,
- server_key_path=self.key_path,
- on_connect=lambda cid: self._log_activity("MQTT", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("MQTT", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- rewrite_ip=(self.target_host, self.bind_address) if self.bind_address != "0.0.0.0" else None,
- )
- # File transfer proxy — port 6000 (TLS)
- # BambuStudio connects here for verify_job and actual file uploads.
- self._file_transfer_proxy = TLSProxy(
- name="FileTransfer",
- listen_port=self.PRINTER_FILE_TRANSFER_PORT,
- target_host=self.target_host,
- target_port=self.PRINTER_FILE_TRANSFER_PORT,
- server_cert_path=self.cert_path,
- server_key_path=self.key_path,
- on_connect=lambda cid: self._log_activity("FileTransfer", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("FileTransfer", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- # Bind/auth — respond with VP identity instead of proxying to printer.
- # The detect response contains the printer name, serial, model, and
- # bind status. Proxying it would leak the real printer's identity and
- # cause the slicer to treat it as a different device.
- if self.bind_identity:
- from backend.app.services.virtual_printer.bind_server import BindServer
- self._bind_server = BindServer(
- serial=self.bind_identity["serial"],
- model=self.bind_identity["model"],
- name=self.bind_identity["name"],
- version=self.bind_identity.get("version", "01.00.00.00"),
- bind_address=self.bind_address,
- cert_path=self.cert_path,
- key_path=self.key_path,
- )
- else:
- # Fallback: proxy bind requests to the real printer
- for bind_port in self.PRINTER_BIND_PORTS:
- if bind_port == 3002:
- proxy = TLSProxy(
- name="Bind-TLS",
- listen_port=bind_port,
- target_host=self.target_host,
- target_port=bind_port,
- server_cert_path=self.cert_path,
- server_key_path=self.key_path,
- on_connect=lambda cid: self._log_activity("Bind", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("Bind", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- else:
- proxy = TCPProxy(
- name="Bind",
- listen_port=bind_port,
- target_host=self.target_host,
- target_port=bind_port,
- on_connect=lambda cid: self._log_activity("Bind", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("Bind", f"disconnected: {cid}"),
- bind_address=self.bind_address,
- )
- self._bind_proxies.append(proxy)
- # Start as background tasks
- async def run_with_logging(proxy: TLSProxy) -> None:
- try:
- await proxy.start()
- except Exception as e:
- logger.error("Slicer proxy %s failed: %s", proxy.name, e)
- self._tasks = [
- asyncio.create_task(
- run_with_logging(self._ftp_proxy),
- name="slicer_proxy_ftp",
- ),
- asyncio.create_task(
- run_with_logging(self._mqtt_proxy),
- name="slicer_proxy_mqtt",
- ),
- asyncio.create_task(
- run_with_logging(self._file_transfer_proxy),
- name="slicer_proxy_file_transfer",
- ),
- ]
- if self._bind_server:
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(self._bind_server),
- name="slicer_proxy_bind_server",
- )
- )
- for bp in self._bind_proxies:
- self._tasks.append(
- asyncio.create_task(
- run_with_logging(bp),
- name=f"slicer_proxy_bind_{bp.listen_port}",
- )
- )
- logger.info("Slicer TLS proxy started for %s", self.target_host)
- # Wait for tasks to complete (they run until cancelled)
- # This keeps the start() coroutine alive so the parent task doesn't complete
- try:
- await asyncio.gather(*self._tasks)
- except asyncio.CancelledError:
- logger.debug("Slicer proxy start cancelled")
- async def stop(self) -> None:
- """Stop all proxies."""
- logger.info("Stopping slicer proxy")
- # Stop proxies
- if self._ftp_proxy:
- await self._ftp_proxy.stop()
- self._ftp_proxy = None
- if self._mqtt_proxy:
- await self._mqtt_proxy.stop()
- self._mqtt_proxy = None
- if self._file_transfer_proxy:
- await self._file_transfer_proxy.stop()
- self._file_transfer_proxy = None
- if self._bind_server:
- await self._bind_server.stop()
- self._bind_server = None
- for bp in self._bind_proxies:
- await bp.stop()
- self._bind_proxies = []
- # Cancel tasks
- for task in self._tasks:
- task.cancel()
- if self._tasks:
- try:
- await asyncio.wait_for(
- asyncio.gather(*self._tasks, return_exceptions=True),
- timeout=2.0,
- )
- except TimeoutError:
- logger.debug("Some proxy tasks didn't stop in time")
- self._tasks = []
- logger.info("Slicer proxy stopped")
- def _log_activity(self, name: str, message: str) -> None:
- """Log activity via callback if configured."""
- if self.on_activity:
- try:
- self.on_activity(name, message)
- except Exception:
- pass # Ignore activity callback errors; logging is non-critical
- @property
- def is_running(self) -> bool:
- """Check if proxies are running."""
- return len(self._tasks) > 0 and all(not t.done() for t in self._tasks)
- def get_status(self) -> dict:
- """Get proxy status."""
- return {
- "running": self.is_running,
- "target_host": self.target_host,
- "ftp_port": self.LOCAL_FTP_PORT,
- "mqtt_port": self.LOCAL_MQTT_PORT,
- "bind_ports": self.PRINTER_BIND_PORTS,
- "ftp_connections": (len(self._ftp_proxy._active_connections) if self._ftp_proxy else 0),
- "mqtt_connections": (len(self._mqtt_proxy._active_connections) if self._mqtt_proxy else 0),
- "bind_connections": sum(len(bp._active_connections) for bp in self._bind_proxies),
- }
|