| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- """TLS proxy for slicer-to-printer communication.
- This module provides a TLS terminating proxy that forwards data between
- a slicer and a real Bambu printer, enabling remote printing over
- any network connection.
- Unlike a transparent TCP proxy, this terminates TLS on both ends:
- - Slicer connects to Bambuddy using Bambuddy's certificate
- - Bambuddy connects to printer using printer's certificate
- - Data is decrypted, forwarded, and re-encrypted
- """
- import asyncio
- import logging
- import ssl
- from collections.abc import Callable
- from pathlib import Path
- logger = logging.getLogger(__name__)
- class TLSProxy:
- """TLS terminating proxy that forwards data between client and target.
- This proxy terminates TLS on both ends, allowing the slicer to connect
- to Bambuddy's certificate while Bambuddy connects to the real printer.
- """
- def __init__(
- self,
- name: str,
- listen_port: int,
- target_host: str,
- target_port: int,
- server_cert_path: Path,
- server_key_path: Path,
- on_connect: Callable[[str], None] | None = None,
- on_disconnect: Callable[[str], None] | None = None,
- ):
- """Initialize the TLS proxy.
- Args:
- name: Friendly name for logging (e.g., "FTP", "MQTT")
- listen_port: Port to listen on for incoming connections
- target_host: Target printer IP/hostname
- target_port: Target printer port
- server_cert_path: Path to server certificate (for accepting slicer connections)
- server_key_path: Path to server private key
- on_connect: Optional callback when client connects (receives client_id)
- on_disconnect: Optional callback when client disconnects (receives client_id)
- """
- self.name = name
- self.listen_port = listen_port
- self.target_host = target_host
- self.target_port = target_port
- self.server_cert_path = server_cert_path
- self.server_key_path = server_key_path
- self.on_connect = on_connect
- self.on_disconnect = on_disconnect
- self._server: asyncio.Server | None = None
- self._running = False
- self._active_connections: dict[str, tuple[asyncio.Task, asyncio.Task]] = {}
- self._server_ssl_context: ssl.SSLContext | None = None
- self._client_ssl_context: ssl.SSLContext | None = None
- def _create_server_ssl_context(self) -> ssl.SSLContext:
- """Create SSL context for accepting client (slicer) connections."""
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- ctx.load_cert_chain(self.server_cert_path, self.server_key_path)
- # Allow older TLS versions for compatibility with slicers
- ctx.minimum_version = ssl.TLSVersion.TLSv1_2
- # Don't require client certificates
- ctx.verify_mode = ssl.CERT_NONE
- return ctx
- def _create_client_ssl_context(self) -> ssl.SSLContext:
- """Create SSL context for connecting to printer."""
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- # Don't verify printer's certificate (self-signed)
- ctx.check_hostname = False
- ctx.verify_mode = ssl.CERT_NONE
- ctx.minimum_version = ssl.TLSVersion.TLSv1_2
- return ctx
- async def start(self) -> None:
- """Start the TLS proxy server."""
- if self._running:
- return
- logger.info(
- f"Starting {self.name} TLS proxy: 0.0.0.0:{self.listen_port} → {self.target_host}:{self.target_port}"
- )
- try:
- self._running = True
- # Create SSL contexts
- self._server_ssl_context = self._create_server_ssl_context()
- self._client_ssl_context = self._create_client_ssl_context()
- # Start server with TLS
- self._server = await asyncio.start_server(
- self._handle_client,
- "0.0.0.0", # nosec B104 - virtual printer proxy
- self.listen_port,
- ssl=self._server_ssl_context,
- )
- logger.info("%s TLS proxy listening on port %s", self.name, self.listen_port)
- async with self._server:
- await self._server.serve_forever()
- except OSError as e:
- if e.errno == 98: # Address already in use
- logger.error("%s proxy port %s is already in use", self.name, self.listen_port)
- else:
- logger.error("%s proxy error: %s", self.name, e)
- except asyncio.CancelledError:
- logger.debug("%s proxy task cancelled", self.name)
- except Exception as e:
- logger.error("%s proxy error: %s", self.name, e)
- finally:
- await self.stop()
- async def stop(self) -> None:
- """Stop the TLS proxy server."""
- logger.info("Stopping %s proxy", self.name)
- self._running = False
- # Cancel all active connection tasks
- for client_id, (task1, task2) in list(self._active_connections.items()):
- task1.cancel()
- task2.cancel()
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass # Ignore disconnect callback errors during shutdown
- self._active_connections.clear()
- if self._server:
- try:
- self._server.close()
- await self._server.wait_closed()
- except OSError as e:
- logger.debug("Error closing %s proxy server: %s", self.name, e)
- self._server = None
- async def _handle_client(
- self,
- client_reader: asyncio.StreamReader,
- client_writer: asyncio.StreamWriter,
- ) -> None:
- """Handle a new client connection by proxying to target."""
- peername = client_writer.get_extra_info("peername")
- client_id = f"{peername[0]}:{peername[1]}" if peername else "unknown"
- logger.info("%s proxy: client connected from %s", self.name, client_id)
- if self.on_connect:
- try:
- self.on_connect(client_id)
- except Exception:
- pass # Ignore connect callback errors; connection proceeds regardless
- # Connect to target printer with TLS
- try:
- printer_reader, printer_writer = await asyncio.wait_for(
- asyncio.open_connection(
- self.target_host,
- self.target_port,
- ssl=self._client_ssl_context,
- ),
- timeout=10.0,
- )
- logger.info("%s proxy: connected to printer %s:%s", self.name, self.target_host, self.target_port)
- except TimeoutError:
- logger.error("%s proxy: timeout connecting to %s:%s", self.name, self.target_host, self.target_port)
- client_writer.close()
- await client_writer.wait_closed()
- return
- except ssl.SSLError as e:
- logger.error(
- "%s proxy: SSL error connecting to %s:%s: %s", self.name, self.target_host, self.target_port, e
- )
- client_writer.close()
- await client_writer.wait_closed()
- return
- except OSError as e:
- logger.error("%s proxy: failed to connect to %s:%s: %s", self.name, self.target_host, self.target_port, e)
- client_writer.close()
- await client_writer.wait_closed()
- return
- # Create bidirectional forwarding tasks
- client_to_printer = asyncio.create_task(
- self._forward(client_reader, printer_writer, f"{client_id}→printer"),
- name=f"{self.name}_c2p_{client_id}",
- )
- printer_to_client = asyncio.create_task(
- self._forward(printer_reader, client_writer, f"printer→{client_id}"),
- name=f"{self.name}_p2c_{client_id}",
- )
- self._active_connections[client_id] = (client_to_printer, printer_to_client)
- try:
- # Wait for either direction to complete (connection closed)
- done, pending = await asyncio.wait(
- [client_to_printer, printer_to_client],
- return_when=asyncio.FIRST_COMPLETED,
- )
- # Cancel the other direction
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass # Expected when cancelling the other forwarding direction
- except Exception as e:
- logger.debug("%s proxy connection error: %s", self.name, e)
- finally:
- # Clean up
- self._active_connections.pop(client_id, None)
- for writer in [client_writer, printer_writer]:
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Best-effort connection cleanup; peer may have disconnected
- logger.info("%s proxy: client %s disconnected", self.name, client_id)
- if self.on_disconnect:
- try:
- self.on_disconnect(client_id)
- except Exception:
- pass # Ignore disconnect callback errors; cleanup continues
- async def _forward(
- self,
- reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter,
- direction: str,
- ) -> None:
- """Forward data from reader to writer.
- Args:
- reader: Source stream (already TLS-decrypted)
- writer: Destination stream (will be TLS-encrypted by the stream)
- direction: Description for logging (e.g., "client→printer")
- """
- total_bytes = 0
- try:
- while self._running:
- # Read chunk - use reasonable buffer size
- data = await reader.read(65536)
- if not data:
- # Connection closed
- break
- # Forward to destination
- writer.write(data)
- await writer.drain()
- total_bytes += len(data)
- logger.debug("%s proxy %s: %s bytes", self.name, direction, len(data))
- except asyncio.CancelledError:
- pass # Expected when the other forwarding direction closes first
- except ConnectionResetError:
- logger.debug("%s proxy %s: connection reset", self.name, direction)
- except BrokenPipeError:
- logger.debug("%s proxy %s: broken pipe", self.name, direction)
- except OSError as e:
- logger.debug("%s proxy %s error: %s", self.name, direction, e)
- logger.debug("%s proxy %s: total %s bytes", self.name, direction, total_bytes)
- class SlicerProxyManager:
- """Manages FTP and MQTT TLS proxies for a single printer target."""
- # Bambu printer ports
- PRINTER_FTP_PORT = 990
- PRINTER_MQTT_PORT = 8883
- # Local listen ports - must match what Bambu Studio expects
- # Note: Port 990 requires root or CAP_NET_BIND_SERVICE capability
- LOCAL_FTP_PORT = 990
- LOCAL_MQTT_PORT = 8883
- def __init__(
- self,
- target_host: str,
- cert_path: Path,
- key_path: Path,
- on_activity: Callable[[str, str], None] | None = None,
- ):
- """Initialize the slicer proxy manager.
- Args:
- target_host: Target printer IP address
- cert_path: Path to server certificate
- key_path: Path to server private key
- on_activity: Optional callback for activity logging (name, message)
- """
- self.target_host = target_host
- self.cert_path = cert_path
- self.key_path = key_path
- self.on_activity = on_activity
- self._ftp_proxy: TLSProxy | None = None
- self._mqtt_proxy: TLSProxy | None = None
- self._tasks: list[asyncio.Task] = []
- async def start(self) -> None:
- """Start FTP and MQTT TLS proxies."""
- logger.info("Starting slicer TLS proxy to %s", self.target_host)
- # Create proxies with TLS
- self._ftp_proxy = TLSProxy(
- name="FTP",
- listen_port=self.LOCAL_FTP_PORT,
- target_host=self.target_host,
- target_port=self.PRINTER_FTP_PORT,
- server_cert_path=self.cert_path,
- server_key_path=self.key_path,
- on_connect=lambda cid: self._log_activity("FTP", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("FTP", f"disconnected: {cid}"),
- )
- self._mqtt_proxy = TLSProxy(
- name="MQTT",
- listen_port=self.LOCAL_MQTT_PORT,
- target_host=self.target_host,
- target_port=self.PRINTER_MQTT_PORT,
- server_cert_path=self.cert_path,
- server_key_path=self.key_path,
- on_connect=lambda cid: self._log_activity("MQTT", f"connected: {cid}"),
- on_disconnect=lambda cid: self._log_activity("MQTT", f"disconnected: {cid}"),
- )
- # Start as background tasks
- async def run_with_logging(proxy: TLSProxy) -> None:
- try:
- await proxy.start()
- except Exception as e:
- logger.error("Slicer proxy %s failed: %s", proxy.name, e)
- self._tasks = [
- asyncio.create_task(
- run_with_logging(self._ftp_proxy),
- name="slicer_proxy_ftp",
- ),
- asyncio.create_task(
- run_with_logging(self._mqtt_proxy),
- name="slicer_proxy_mqtt",
- ),
- ]
- logger.info("Slicer TLS proxy started for %s", self.target_host)
- # Wait for tasks to complete (they run until cancelled)
- # This keeps the start() coroutine alive so the parent task doesn't complete
- try:
- await asyncio.gather(*self._tasks)
- except asyncio.CancelledError:
- logger.debug("Slicer proxy start cancelled")
- async def stop(self) -> None:
- """Stop all proxies."""
- logger.info("Stopping slicer proxy")
- # Stop proxies
- if self._ftp_proxy:
- await self._ftp_proxy.stop()
- self._ftp_proxy = None
- if self._mqtt_proxy:
- await self._mqtt_proxy.stop()
- self._mqtt_proxy = None
- # Cancel tasks
- for task in self._tasks:
- task.cancel()
- if self._tasks:
- try:
- await asyncio.wait_for(
- asyncio.gather(*self._tasks, return_exceptions=True),
- timeout=2.0,
- )
- except TimeoutError:
- logger.debug("Some proxy tasks didn't stop in time")
- self._tasks = []
- logger.info("Slicer proxy stopped")
- def _log_activity(self, name: str, message: str) -> None:
- """Log activity via callback if configured."""
- if self.on_activity:
- try:
- self.on_activity(name, message)
- except Exception:
- pass # Ignore activity callback errors; logging is non-critical
- @property
- def is_running(self) -> bool:
- """Check if proxies are running."""
- return len(self._tasks) > 0 and all(not t.done() for t in self._tasks)
- def get_status(self) -> dict:
- """Get proxy status."""
- return {
- "running": self.is_running,
- "target_host": self.target_host,
- "ftp_port": self.LOCAL_FTP_PORT,
- "mqtt_port": self.LOCAL_MQTT_PORT,
- "ftp_connections": (len(self._ftp_proxy._active_connections) if self._ftp_proxy else 0),
- "mqtt_connections": (len(self._mqtt_proxy._active_connections) if self._mqtt_proxy else 0),
- }
|