Explorar el Código

Add mock FTPS server and comprehensive FTP test suite (67 tests)

FTP bugs have been the #1 recurring issue across releases (0.1.8+).
This adds a real implicit FTPS mock server and 67 test cases covering
every known failure mode — connection, upload, download, delete, storage
info, model-specific SSL behavior, async wrappers, and failure injection.

New files:
- mock_ftp_server.py: implicit FTPS server on pyftpdlib with failure injection
- conftest.py: FTP test fixtures (certs, server, client factory)
- test_bambu_ftp.py: 67 tests across 10 test classes

Also adds pyOpenSSL to requirements-dev.txt (needed by pyftpdlib
TLS_FTPHandler in the Docker test image).
maziggy hace 3 meses
padre
commit
f2468077fe

+ 11 - 0
CHANGELOG.md

@@ -4,6 +4,17 @@ All notable changes to Bambuddy will be documented in this file.
 
 
 ## [0.1.9b] - Not released
 ## [0.1.9b] - Not released
 
 
+### Testing
+- **Mock FTPS Server & Comprehensive FTP Test Suite** — Added 67 automated test cases against a real implicit FTPS mock server, covering every known FTP failure mode from 0.1.8+:
+  - Mock server (`mock_ftp_server.py`) implements implicit TLS, custom AVBL command, and per-command failure injection
+  - Connection tests: auth, SSL modes (prot_p/prot_c), timeout, cache, disconnect edge cases
+  - Upload tests: chunked transfer via `transfercmd()`, progress callbacks, 553/550/552 error handling
+  - Download tests: bytes, to-file, 0-byte regression, large files, missing file cleanup
+  - Model-specific tests: X1C session reuse, A1/A1 Mini prot_c fallback, P1S, unknown model defaults
+  - Async wrapper tests: upload/download/list/delete with A1 fallback and multi-path download
+  - Failure injection tests: regressions for `error_perm` hierarchy, `diagnose_storage` CWD propagation, injection count decrement
+  - Added `pyOpenSSL` to `requirements-dev.txt` for Docker test image compatibility
+
 ## [0.1.8.1] - 2026-02-07
 ## [0.1.8.1] - 2026-02-07
 
 
 ### Fixed
 ### Fixed

+ 99 - 0
backend/tests/unit/services/conftest.py

@@ -0,0 +1,99 @@
+"""Test fixtures for FTP service tests.
+
+Provides a real implicit FTPS server (via mock_ftp_server) and client factory
+for integration-style testing of BambuFTPClient against a live server.
+"""
+
+import socket
+from unittest.mock import patch
+
+import pytest
+
+from backend.app.services.bambu_ftp import BambuFTPClient
+from backend.app.services.virtual_printer.certificate import CertificateService
+from backend.tests.unit.services.mock_ftp_server import MockBambuFTPServer
+
+
+@pytest.fixture(scope="session")
+def ftp_certs(tmp_path_factory):
+    """Generate self-signed TLS certificates once per test session."""
+    cert_dir = tmp_path_factory.mktemp("ftp_certs")
+    svc = CertificateService(cert_dir, serial="TEST_FTP_SERVER")
+    cert_path, key_path = svc.generate_certificates()
+    return str(cert_path), str(key_path)
+
+
+def _find_free_port() -> int:
+    """Find a free TCP port on localhost."""
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+        s.bind(("127.0.0.1", 0))
+        return s.getsockname()[1]
+
+
+@pytest.fixture()
+def ftp_root(tmp_path):
+    """Create temp directory with standard Bambu printer directory structure."""
+    for d in ("cache", "timelapse", "model", "data", "data/Metadata"):
+        (tmp_path / d).mkdir(parents=True, exist_ok=True)
+    return tmp_path
+
+
+@pytest.fixture()
+def ftp_server(ftp_certs, ftp_root):
+    """Start a mock implicit FTPS server, yield it, stop on cleanup."""
+    cert_path, key_path = ftp_certs
+    port = _find_free_port()
+    server = MockBambuFTPServer(
+        host="127.0.0.1",
+        port=port,
+        root_dir=str(ftp_root),
+        cert_path=cert_path,
+        key_path=key_path,
+        access_code="12345678",
+    )
+    server.start()
+    yield server
+    server.stop()
+
+
+@pytest.fixture()
+def ftp_client_factory(ftp_server):
+    """Factory that creates BambuFTPClient instances pointed at the mock server."""
+
+    def _make_client(
+        printer_model: str = "X1C",
+        force_prot_c: bool = False,
+        access_code: str = "12345678",
+        timeout: float = 10.0,
+    ) -> BambuFTPClient:
+        client = BambuFTPClient(
+            ip_address="127.0.0.1",
+            access_code=access_code,
+            timeout=timeout,
+            printer_model=printer_model,
+            force_prot_c=force_prot_c,
+        )
+        # Override port to point at mock server
+        client.FTP_PORT = ftp_server.port
+        return client
+
+    return _make_client
+
+
+@pytest.fixture(autouse=True)
+def clear_ftp_mode_cache():
+    """Clear BambuFTPClient mode cache before and after each test."""
+    BambuFTPClient._mode_cache.clear()
+    yield
+    BambuFTPClient._mode_cache.clear()
+
+
+@pytest.fixture()
+def patch_ftp_port(ftp_server):
+    """Patch FTP_PORT at class level for async wrapper tests.
+
+    Async wrappers create their own BambuFTPClient instances internally,
+    so we need to patch the class-level default port.
+    """
+    with patch.object(BambuFTPClient, "FTP_PORT", ftp_server.port):
+        yield ftp_server

+ 240 - 0
backend/tests/unit/services/mock_ftp_server.py

@@ -0,0 +1,240 @@
+"""Mock implicit FTPS server for testing BambuFTPClient.
+
+Built on pyftpdlib with implicit TLS support to match Bambu printer behavior.
+Supports failure injection, custom AVBL command, and filesystem inspection.
+"""
+
+import logging
+import os
+import threading
+import time
+
+from pyftpdlib.authorizers import DummyAuthorizer
+from pyftpdlib.handlers import TLS_FTPHandler
+from pyftpdlib.servers import FTPServer
+
+
+class ImplicitTLS_FTPHandler(TLS_FTPHandler):
+    """FTP handler that wraps the socket in TLS before sending the 220 banner.
+
+    This implements implicit FTPS (port 990 style) where the TLS handshake
+    happens immediately on connect, before any FTP protocol exchange.
+    pyftpdlib only natively supports explicit FTPS (AUTH TLS after connect).
+    """
+
+    # Per-class failure injection map: command -> (code, message, remaining_count)
+    # -1 remaining_count = permanent failure
+    _failure_map: dict = {}
+
+    # AVBL command response (bytes available)
+    _avbl_bytes: int = 1073741824  # 1 GB default
+
+    # Register AVBL as a recognized FTP command (pyftpdlib requires this)
+    proto_cmds = {
+        **TLS_FTPHandler.proto_cmds,
+        "AVBL": {
+            "perm": None,
+            "auth": True,
+            "arg": None,
+            "help": "Syntax: AVBL (get available bytes).",
+        },
+    }
+
+    def handle(self):
+        """Wrap socket in TLS immediately, then send 220 banner."""
+        self.secure_connection(self.get_ssl_context())
+        super().handle()
+
+    def ftp_PROT(self, line):
+        """Override PROT to auto-set _pbsz for implicit FTPS.
+
+        In implicit FTPS the connection is already TLS-secured, so requiring
+        a separate PBSZ command is unnecessary. Python's ftplib prot_c()
+        doesn't send PBSZ first (unlike prot_p()), causing 503 errors.
+        Real Bambu printers don't enforce this for implicit FTPS either.
+        """
+        self._pbsz = True
+        return super().ftp_PROT(line)
+
+    def _check_failure(self, command: str, line: str):
+        """Check if a failure is injected for this command.
+
+        Returns True if a failure response was sent, False otherwise.
+        """
+        if command in self._failure_map:
+            code, message, remaining = self._failure_map[command]
+            if remaining != 0:
+                if remaining > 0:
+                    self._failure_map[command] = (code, message, remaining - 1)
+                    if remaining - 1 == 0:
+                        del self._failure_map[command]
+                self.respond(f"{code} {message}")
+                return True
+        return False
+
+    def ftp_AVBL(self, line):
+        """Handle custom AVBL command (available bytes on storage)."""
+        self.respond(f"213 {self._avbl_bytes}")
+
+    def ftp_RETR(self, file):
+        if self._check_failure("RETR", file):
+            return
+        return super().ftp_RETR(file)
+
+    def ftp_STOR(self, file):
+        if self._check_failure("STOR", file):
+            return
+        return super().ftp_STOR(file)
+
+    def ftp_DELE(self, line):
+        if self._check_failure("DELE", line):
+            return
+        return super().ftp_DELE(line)
+
+    def ftp_CWD(self, path):
+        if self._check_failure("CWD", path):
+            return
+        return super().ftp_CWD(path)
+
+    def ftp_LIST(self, path=""):
+        if self._check_failure("LIST", path):
+            return
+        return super().ftp_LIST(path)
+
+    def ftp_SIZE(self, path):
+        if self._check_failure("SIZE", path):
+            return
+        # Override to allow SIZE in ASCII mode (real Bambu printers allow it,
+        # and BambuFTPClient.get_file_size() doesn't set TYPE I first)
+        if not self.fs.isfile(self.fs.realpath(path)):
+            self.respond(f"550 {self.fs.fs2ftp(path)} is not retrievable.")
+            return
+        try:
+            size = self.run_as_current_user(self.fs.getsize, path)
+        except OSError as err:
+            self.respond(f"550 {err}.")
+        else:
+            self.respond(f"213 {size}")
+
+    def ftp_PASS(self, line):
+        if self._check_failure("PASS", line):
+            return
+        return super().ftp_PASS(line)
+
+
+class MockBambuFTPServer:
+    """Manages a mock implicit FTPS server in a background thread.
+
+    Simulates a Bambu printer FTP server with:
+    - Implicit TLS (like real printers on port 990)
+    - Standard Bambu directory structure
+    - AVBL command support
+    - Per-command failure injection for testing error paths
+    """
+
+    def __init__(
+        self,
+        host: str,
+        port: int,
+        root_dir: str,
+        cert_path: str,
+        key_path: str,
+        access_code: str = "12345678",
+    ):
+        self.host = host
+        self.port = port
+        self.root_dir = root_dir
+        self.cert_path = cert_path
+        self.key_path = key_path
+        self.access_code = access_code
+        self._server: FTPServer | None = None
+        self._thread: threading.Thread | None = None
+        # Create a unique handler class per instance so _failure_map is isolated
+        self._handler_class = type(
+            "TestFTPHandler",
+            (ImplicitTLS_FTPHandler,),
+            {
+                "_failure_map": {},
+                "_avbl_bytes": 1073741824,
+            },
+        )
+
+    def start(self):
+        """Start the FTP server in a background daemon thread."""
+        authorizer = DummyAuthorizer()
+        authorizer.add_user("bblp", self.access_code, self.root_dir, perm="elradfmwMT")
+
+        handler = self._handler_class
+        handler.authorizer = authorizer
+        handler.certfile = self.cert_path
+        handler.keyfile = self.key_path
+        handler.passive_ports = range(60000, 60101)
+        handler.tls_control_required = False
+        handler.tls_data_required = False
+        # Reset ssl_context so it picks up our cert/key
+        handler.ssl_context = None
+
+        # Suppress pyftpdlib's noisy logging (startup/shutdown banners)
+        # to avoid "I/O operation on closed file" errors when xdist
+        # workers tear down while the daemon thread is still logging.
+        logging.getLogger("pyftpdlib").setLevel(logging.CRITICAL)
+
+        self._server = FTPServer((self.host, self.port), handler)
+        self._server.max_cons = 10
+        self._server.max_cons_per_ip = 5
+
+        self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
+        self._thread.start()
+        # Brief wait for server to be ready
+        time.sleep(0.1)
+
+    def stop(self):
+        """Stop the FTP server and wait for thread to exit."""
+        if self._server:
+            self._server.close_all()
+        if self._thread:
+            self._thread.join(timeout=5)
+        self._server = None
+        self._thread = None
+
+    def inject_failure(self, command: str, code: int, message: str, count: int = -1):
+        """Inject a failure response for a specific FTP command.
+
+        Args:
+            command: FTP command name (RETR, STOR, DELE, CWD, LIST, SIZE, PASS)
+            code: FTP response code (e.g. 550, 553)
+            message: Response message
+            count: Number of times to fail (-1 = permanent)
+        """
+        self._handler_class._failure_map[command] = (code, message, count)
+
+    def clear_failures(self):
+        """Remove all injected failures."""
+        self._handler_class._failure_map.clear()
+
+    def set_avbl_bytes(self, n: int):
+        """Set the response value for the AVBL command."""
+        self._handler_class._avbl_bytes = n
+
+    def add_file(self, relative_path: str, content: bytes = b""):
+        """Add a file to the server's filesystem."""
+        full_path = os.path.join(self.root_dir, relative_path.lstrip("/"))
+        os.makedirs(os.path.dirname(full_path), exist_ok=True)
+        with open(full_path, "wb") as f:
+            f.write(content)
+
+    def add_directory(self, relative_path: str):
+        """Create a directory in the server's filesystem."""
+        full_path = os.path.join(self.root_dir, relative_path.lstrip("/"))
+        os.makedirs(full_path, exist_ok=True)
+
+    def file_exists(self, relative_path: str) -> bool:
+        """Check if a file exists on the server."""
+        full_path = os.path.join(self.root_dir, relative_path.lstrip("/"))
+        return os.path.isfile(full_path)
+
+    def read_file(self, relative_path: str) -> bytes:
+        """Read file content from the server's filesystem."""
+        full_path = os.path.join(self.root_dir, relative_path.lstrip("/"))
+        with open(full_path, "rb") as f:
+            return f.read()

+ 864 - 0
backend/tests/unit/services/test_bambu_ftp.py

@@ -0,0 +1,864 @@
+"""Comprehensive FTP test suite for BambuFTPClient.
+
+Tests against a real mock implicit FTPS server, covering:
+- Connection (auth, SSL modes, timeout, caching)
+- File listing
+- Download (bytes, to_file, 0-byte regression)
+- Upload (chunked transfer, progress, error codes)
+- Delete
+- File size
+- Storage info (AVBL, directory scan, diagnose_storage)
+- Model-specific behavior (X1C prot_p, A1 prot_c fallback)
+- Async wrappers
+- Failure injection scenarios (regressions for 0.1.8 bugs)
+"""
+
+import time
+from pathlib import Path
+
+import pytest
+
+from backend.app.services.bambu_ftp import (
+    BambuFTPClient,
+    delete_file_async,
+    download_file_async,
+    download_file_try_paths_async,
+    list_files_async,
+    upload_file_async,
+)
+
+# Brief delay to allow pyftpdlib to flush uploaded files to disk.
+# Needed because upload_file() skips voidresp() for A1 compatibility,
+# so the server may still be processing the data channel close event.
+_UPLOAD_FLUSH_DELAY = 0.3
+
+
+# ---------------------------------------------------------------------------
+# TestConnection
+# ---------------------------------------------------------------------------
+class TestConnection:
+    """Tests for FTP connect/disconnect behavior."""
+
+    def test_connect_success(self, ftp_client_factory):
+        """Successful implicit FTPS connection and login."""
+        client = ftp_client_factory()
+        assert client.connect() is True
+        client.disconnect()
+
+    def test_connect_wrong_access_code(self, ftp_client_factory):
+        """Wrong access code returns False."""
+        client = ftp_client_factory(access_code="wrongcode")
+        assert client.connect() is False
+
+    def test_connect_unreachable_host(self, ftp_server):
+        """Unreachable host returns False."""
+        client = BambuFTPClient(
+            ip_address="192.0.2.1",  # TEST-NET, guaranteed unreachable
+            access_code="12345678",
+            timeout=1.0,
+            printer_model="X1C",
+        )
+        client.FTP_PORT = ftp_server.port
+        assert client.connect() is False
+
+    def test_connect_timeout(self, ftp_server):
+        """Very short timeout triggers timeout error."""
+        client = BambuFTPClient(
+            ip_address="192.0.2.1",
+            access_code="12345678",
+            timeout=0.001,  # Extremely short
+            printer_model="X1C",
+        )
+        client.FTP_PORT = ftp_server.port
+        assert client.connect() is False
+
+    def test_disconnect_clean(self, ftp_client_factory):
+        """Clean disconnect after successful connect."""
+        client = ftp_client_factory()
+        client.connect()
+        client.disconnect()
+        assert client._ftp is None
+
+    def test_disconnect_without_connect(self, ftp_client_factory):
+        """Disconnect without connect does not raise."""
+        client = ftp_client_factory()
+        client.disconnect()  # Should not raise
+        assert client._ftp is None
+
+    def test_disconnect_after_server_gone(self, ftp_certs, ftp_root):
+        """Disconnect after server has stopped raises EOFError.
+
+        Note: The current disconnect() catches (OSError, ftplib.Error) but
+        EOFError is neither. This documents actual behavior — a future fix
+        could add EOFError to the except clause.
+        """
+        from backend.tests.unit.services.mock_ftp_server import (
+            MockBambuFTPServer,
+        )
+
+        from .conftest import _find_free_port
+
+        cert_path, key_path = ftp_certs
+        port = _find_free_port()
+        server = MockBambuFTPServer("127.0.0.1", port, str(ftp_root), cert_path, key_path)
+        server.start()
+
+        client = BambuFTPClient("127.0.0.1", "12345678", timeout=5.0)
+        client.FTP_PORT = port
+        client.connect()
+
+        server.stop()
+        with pytest.raises(EOFError):
+            client.disconnect()
+
+    def test_x1c_uses_prot_p(self, ftp_client_factory):
+        """X1C model connects with prot_p (protected data channel)."""
+        client = ftp_client_factory(printer_model="X1C")
+        assert client.connect() is True
+        assert client._should_use_prot_c() is False
+        client.disconnect()
+
+    def test_a1_defaults_prot_p(self, ftp_client_factory):
+        """A1 model defaults to prot_p when no cache exists."""
+        client = ftp_client_factory(printer_model="A1")
+        assert client._should_use_prot_c() is False
+        assert client.connect() is True
+        client.disconnect()
+
+    def test_a1_force_prot_c(self, ftp_client_factory):
+        """A1 model with force_prot_c uses clear data channel."""
+        client = ftp_client_factory(printer_model="A1", force_prot_c=True)
+        assert client._should_use_prot_c() is True
+        assert client.connect() is True
+        client.disconnect()
+
+    def test_cached_mode_respected(self, ftp_client_factory):
+        """Cached mode is used on subsequent connections."""
+        BambuFTPClient.cache_mode("127.0.0.1", "prot_c")
+        client = ftp_client_factory(printer_model="A1")
+        assert client._should_use_prot_c() is True
+        assert client.connect() is True
+        client.disconnect()
+
+
+# ---------------------------------------------------------------------------
+# TestListFiles
+# ---------------------------------------------------------------------------
+class TestListFiles:
+    """Tests for directory listing."""
+
+    def test_list_empty_directory(self, ftp_client_factory):
+        """Listing an empty directory returns empty list."""
+        client = ftp_client_factory()
+        client.connect()
+        files = client.list_files("/cache")
+        assert files == []
+        client.disconnect()
+
+    def test_list_directory_with_files(self, ftp_client_factory, ftp_server):
+        """Files in directory are listed correctly."""
+        ftp_server.add_file("cache/test.3mf", b"x" * 1024)
+        ftp_server.add_file("cache/test2.gcode", b"y" * 512)
+        client = ftp_client_factory()
+        client.connect()
+        files = client.list_files("/cache")
+        names = {f["name"] for f in files}
+        assert "test.3mf" in names
+        assert "test2.gcode" in names
+        client.disconnect()
+
+    def test_directories_marked(self, ftp_client_factory, ftp_server):
+        """Subdirectories are identified with is_directory=True."""
+        ftp_server.add_directory("model/subdir")
+        client = ftp_client_factory()
+        client.connect()
+        files = client.list_files("/model")
+        dirs = [f for f in files if f["is_directory"]]
+        assert len(dirs) >= 1
+        assert dirs[0]["name"] == "subdir"
+        client.disconnect()
+
+    def test_nonexistent_path_returns_empty(self, ftp_client_factory):
+        """Listing a nonexistent path returns empty list."""
+        client = ftp_client_factory()
+        client.connect()
+        files = client.list_files("/nonexistent/path")
+        assert files == []
+        client.disconnect()
+
+    def test_file_sizes_and_paths(self, ftp_client_factory, ftp_server):
+        """File sizes and full paths are parsed correctly."""
+        ftp_server.add_file("cache/sized.bin", b"a" * 2048)
+        client = ftp_client_factory()
+        client.connect()
+        files = client.list_files("/cache")
+        sized = [f for f in files if f["name"] == "sized.bin"]
+        assert len(sized) == 1
+        assert sized[0]["size"] == 2048
+        assert sized[0]["path"] == "/cache/sized.bin"
+        client.disconnect()
+
+
+# ---------------------------------------------------------------------------
+# TestDownload
+# ---------------------------------------------------------------------------
+class TestDownload:
+    """Tests for file download operations."""
+
+    def test_download_file_returns_bytes(self, ftp_client_factory, ftp_server):
+        """download_file() returns file content as bytes."""
+        content = b"Hello FTP World!"
+        ftp_server.add_file("cache/hello.txt", content)
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_file("/cache/hello.txt")
+        assert result == content
+        client.disconnect()
+
+    def test_download_file_missing(self, ftp_client_factory):
+        """download_file() returns None for missing file."""
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_file("/cache/does_not_exist.txt")
+        assert result is None
+        client.disconnect()
+
+    def test_download_to_file_writes_to_disk(self, ftp_client_factory, ftp_server, tmp_path):
+        """download_to_file() writes content to local filesystem."""
+        content = b"Downloaded content"
+        ftp_server.add_file("cache/dl.bin", content)
+        local = tmp_path / "output" / "dl.bin"
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_to_file("/cache/dl.bin", local)
+        assert result is True
+        assert local.read_bytes() == content
+        client.disconnect()
+
+    def test_download_to_file_creates_parent_dirs(self, ftp_client_factory, ftp_server, tmp_path):
+        """download_to_file() creates parent directories automatically."""
+        ftp_server.add_file("cache/nested.txt", b"nested content")
+        local = tmp_path / "deep" / "nested" / "path" / "nested.txt"
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_to_file("/cache/nested.txt", local)
+        assert result is True
+        assert local.exists()
+        client.disconnect()
+
+    def test_zero_byte_download_returns_false(self, ftp_client_factory, ftp_server, tmp_path):
+        """0-byte download returns False and cleans up (regression test)."""
+        ftp_server.add_file("cache/empty.bin", b"")
+        local = tmp_path / "empty.bin"
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_to_file("/cache/empty.bin", local)
+        assert result is False
+        assert not local.exists()
+        client.disconnect()
+
+    def test_download_to_file_missing_returns_false(self, ftp_client_factory, tmp_path):
+        """Missing file returns False."""
+        local = tmp_path / "missing.bin"
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_to_file("/cache/no_such_file.bin", local)
+        assert result is False
+        client.disconnect()
+
+    def test_download_large_file(self, ftp_client_factory, ftp_server):
+        """Large file download (>1MB) works correctly."""
+        large_content = b"X" * (1024 * 1024 + 500)  # ~1MB + 500 bytes
+        ftp_server.add_file("cache/large.bin", large_content)
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_file("/cache/large.bin")
+        assert result == large_content
+        client.disconnect()
+
+    def test_download_not_connected(self):
+        """download_file() returns None when not connected."""
+        client = BambuFTPClient("127.0.0.1", "12345678")
+        assert client.download_file("/cache/test.bin") is None
+
+
+# ---------------------------------------------------------------------------
+# TestUpload
+# ---------------------------------------------------------------------------
+class TestUpload:
+    """Tests for file upload operations."""
+
+    def test_upload_success(self, ftp_client_factory, ftp_server, tmp_path):
+        """Successful upload via transfercmd (not storbinary)."""
+        content = b"Upload test content"
+        local = tmp_path / "upload.3mf"
+        local.write_bytes(content)
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_file(local, "/cache/upload.3mf")
+        assert result is True
+        client.disconnect()
+        # Verify via fresh connection (upload_file skips voidresp()
+        # so the original session can't be reused for download)
+        time.sleep(_UPLOAD_FLUSH_DELAY)
+        client2 = ftp_client_factory()
+        client2.connect()
+        downloaded = client2.download_file("/cache/upload.3mf")
+        assert downloaded == content
+        client2.disconnect()
+
+    def test_upload_progress_callback(self, ftp_client_factory, ftp_server, tmp_path):
+        """Progress callback receives updates during upload."""
+        content = b"P" * 2048
+        local = tmp_path / "progress.bin"
+        local.write_bytes(content)
+
+        progress_calls = []
+
+        def on_progress(uploaded, total):
+            progress_calls.append((uploaded, total))
+
+        client = ftp_client_factory()
+        client.connect()
+        client.upload_file(local, "/cache/progress.bin", on_progress)
+        assert len(progress_calls) >= 1
+        # Last call should report full file uploaded
+        assert progress_calls[-1][0] == len(content)
+        assert progress_calls[-1][1] == len(content)
+        client.disconnect()
+
+    def test_upload_not_connected(self, tmp_path):
+        """Upload when not connected returns False."""
+        local = tmp_path / "test.bin"
+        local.write_bytes(b"data")
+        client = BambuFTPClient("127.0.0.1", "12345678")
+        assert client.upload_file(local, "/cache/test.bin") is False
+
+    def test_upload_553_no_sd_card(self, ftp_client_factory, ftp_server, tmp_path):
+        """553 error (no SD card) returns False."""
+        ftp_server.inject_failure("STOR", 553, "Could not create file.")
+        local = tmp_path / "test.bin"
+        local.write_bytes(b"data")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_file(local, "/cache/test.bin")
+        assert result is False
+        client.disconnect()
+
+    def test_upload_550_permission_denied(self, ftp_client_factory, ftp_server, tmp_path):
+        """550 error (permission denied) returns False."""
+        ftp_server.inject_failure("STOR", 550, "Permission denied.")
+        local = tmp_path / "test.bin"
+        local.write_bytes(b"data")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_file(local, "/cache/test.bin")
+        assert result is False
+        client.disconnect()
+
+    def test_upload_552_storage_full(self, ftp_client_factory, ftp_server, tmp_path):
+        """552 error (storage full) returns False."""
+        ftp_server.inject_failure("STOR", 552, "Storage quota exceeded.")
+        local = tmp_path / "test.bin"
+        local.write_bytes(b"data")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_file(local, "/cache/test.bin")
+        assert result is False
+        client.disconnect()
+
+    def test_upload_bytes_success(self, ftp_client_factory, ftp_server):
+        """upload_bytes() writes data to server."""
+        data = b"Bytes upload content"
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_bytes(data, "/cache/bytes.bin")
+        assert result is True
+        client.disconnect()
+        # Verify via fresh connection
+        time.sleep(_UPLOAD_FLUSH_DELAY)
+        client2 = ftp_client_factory()
+        client2.connect()
+        downloaded = client2.download_file("/cache/bytes.bin")
+        assert downloaded == data
+        client2.disconnect()
+
+    def test_upload_bytes_failure(self, ftp_client_factory, ftp_server):
+        """upload_bytes() returns False on STOR failure."""
+        ftp_server.inject_failure("STOR", 553, "No space.")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_bytes(b"data", "/cache/fail.bin")
+        assert result is False
+        client.disconnect()
+
+    def test_upload_large_chunked(self, ftp_client_factory, ftp_server, tmp_path):
+        """Large file upload in chunks completes without error.
+
+        Uses 2.5MB to trigger multiple chunks with 1MB CHUNK_SIZE.
+        Content verification skipped because upload_file() doesn't call
+        voidresp() (for A1 compatibility), so the server may still be
+        flushing when we check. The upload result=True confirms the
+        client sent all chunks without error.
+        """
+        content = b"C" * (1024 * 1024 * 2 + 512 * 1024)
+        local = tmp_path / "large.bin"
+        local.write_bytes(content)
+
+        progress_calls = []
+
+        def on_progress(uploaded, total):
+            progress_calls.append((uploaded, total))
+
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_file(local, "/cache/large.bin", on_progress)
+        assert result is True
+        # Verify multiple chunks were sent
+        assert len(progress_calls) >= 3  # 2.5MB / 1MB = at least 3 chunks
+        assert progress_calls[-1][0] == len(content)
+        client.disconnect()
+
+
+# ---------------------------------------------------------------------------
+# TestDelete
+# ---------------------------------------------------------------------------
+class TestDelete:
+    """Tests for file deletion."""
+
+    def test_delete_success(self, ftp_client_factory, ftp_server):
+        """Successful file deletion."""
+        ftp_server.add_file("cache/to_delete.bin", b"delete me")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.delete_file("/cache/to_delete.bin")
+        assert result is True
+        assert not ftp_server.file_exists("cache/to_delete.bin")
+        client.disconnect()
+
+    def test_delete_not_found(self, ftp_client_factory):
+        """Deleting a nonexistent file returns False."""
+        client = ftp_client_factory()
+        client.connect()
+        result = client.delete_file("/cache/no_such_file.bin")
+        assert result is False
+        client.disconnect()
+
+    def test_delete_not_connected(self):
+        """Delete when not connected returns False."""
+        client = BambuFTPClient("127.0.0.1", "12345678")
+        assert client.delete_file("/cache/test.bin") is False
+
+
+# ---------------------------------------------------------------------------
+# TestFileSize
+# ---------------------------------------------------------------------------
+class TestFileSize:
+    """Tests for get_file_size."""
+
+    def test_file_size_correct(self, ftp_client_factory, ftp_server):
+        """Returns correct file size."""
+        ftp_server.add_file("cache/sized.bin", b"a" * 4096)
+        client = ftp_client_factory()
+        client.connect()
+        size = client.get_file_size("/cache/sized.bin")
+        assert size == 4096
+        client.disconnect()
+
+    def test_file_size_missing(self, ftp_client_factory):
+        """Returns None for missing file."""
+        client = ftp_client_factory()
+        client.connect()
+        size = client.get_file_size("/cache/no_file.bin")
+        assert size is None
+        client.disconnect()
+
+    def test_file_size_not_connected(self):
+        """Returns None when not connected."""
+        client = BambuFTPClient("127.0.0.1", "12345678")
+        assert client.get_file_size("/cache/test.bin") is None
+
+
+# ---------------------------------------------------------------------------
+# TestStorageInfo
+# ---------------------------------------------------------------------------
+class TestStorageInfo:
+    """Tests for storage info and diagnostics."""
+
+    def test_avbl_parsed(self, ftp_client_factory, ftp_server):
+        """AVBL response is parsed for free_bytes."""
+        ftp_server.set_avbl_bytes(5000000000)
+        client = ftp_client_factory()
+        client.connect()
+        info = client.get_storage_info()
+        assert info is not None
+        assert info["free_bytes"] == 5000000000
+        client.disconnect()
+
+    def test_used_bytes_from_scan(self, ftp_client_factory, ftp_server):
+        """used_bytes calculated from directory scan."""
+        ftp_server.add_file("cache/file1.bin", b"a" * 1000)
+        ftp_server.add_file("cache/file2.bin", b"b" * 2000)
+        client = ftp_client_factory()
+        client.connect()
+        info = client.get_storage_info()
+        assert info is not None
+        assert info["used_bytes"] >= 3000  # At least these two files
+        client.disconnect()
+
+    def test_storage_info_not_connected(self):
+        """Returns None when not connected."""
+        client = BambuFTPClient("127.0.0.1", "12345678")
+        assert client.get_storage_info() is None
+
+    def test_diagnose_storage_success(self, ftp_client_factory, ftp_server):
+        """diagnose_storage() returns connected=True with working diagnostics."""
+        client = ftp_client_factory()
+        client.connect()
+        diag = client.diagnose_storage()
+        assert diag["connected"] is True
+        assert diag["can_list_root"] is True
+        assert diag["can_list_cache"] is True
+        assert diag["pwd"] is not None
+        assert diag["storage_info"] is not None
+        client.disconnect()
+
+    def test_diagnose_storage_not_connected(self):
+        """diagnose_storage() reports not connected."""
+        client = BambuFTPClient("127.0.0.1", "12345678")
+        diag = client.diagnose_storage()
+        assert diag["connected"] is False
+        assert "FTP not connected" in diag["errors"]
+
+
+# ---------------------------------------------------------------------------
+# TestModelSpecificBehavior
+# ---------------------------------------------------------------------------
+class TestModelSpecificBehavior:
+    """Tests for printer model-specific FTP behavior."""
+
+    def test_x1c_upload(self, ftp_client_factory, ftp_server, tmp_path):
+        """X1C upload with session reuse succeeds."""
+        content = b"X1C upload data"
+        local = tmp_path / "x1c.3mf"
+        local.write_bytes(content)
+        client = ftp_client_factory(printer_model="X1C")
+        client.connect()
+        result = client.upload_file(local, "/cache/x1c.3mf")
+        assert result is True
+        client.disconnect()
+        # Verify via fresh connection
+        time.sleep(_UPLOAD_FLUSH_DELAY)
+        client2 = ftp_client_factory(printer_model="X1C")
+        client2.connect()
+        downloaded = client2.download_file("/cache/x1c.3mf")
+        assert downloaded == content
+        client2.disconnect()
+
+    def test_a1_upload_prot_c(self, ftp_client_factory, ftp_server, tmp_path):
+        """A1 model upload with prot_c succeeds."""
+        content = b"A1 upload data"
+        local = tmp_path / "a1.3mf"
+        local.write_bytes(content)
+        client = ftp_client_factory(printer_model="A1", force_prot_c=True)
+        client.connect()
+        result = client.upload_file(local, "/cache/a1.3mf")
+        assert result is True
+        client.disconnect()
+        # Verify via fresh connection
+        time.sleep(_UPLOAD_FLUSH_DELAY)
+        client2 = ftp_client_factory(printer_model="A1", force_prot_c=True)
+        client2.connect()
+        downloaded = client2.download_file("/cache/a1.3mf")
+        assert downloaded == content
+        client2.disconnect()
+
+    def test_a1_mini_upload(self, ftp_client_factory, ftp_server, tmp_path):
+        """A1 Mini model upload succeeds."""
+        content = b"A1 Mini data"
+        local = tmp_path / "a1mini.3mf"
+        local.write_bytes(content)
+        client = ftp_client_factory(printer_model="A1 Mini", force_prot_c=True)
+        client.connect()
+        result = client.upload_file(local, "/cache/a1mini.3mf")
+        assert result is True
+        client.disconnect()
+
+    def test_p1s_upload(self, ftp_client_factory, ftp_server, tmp_path):
+        """P1S model upload with session reuse succeeds."""
+        content = b"P1S upload data"
+        local = tmp_path / "p1s.3mf"
+        local.write_bytes(content)
+        client = ftp_client_factory(printer_model="P1S")
+        client.connect()
+        result = client.upload_file(local, "/cache/p1s.3mf")
+        assert result is True
+        client.disconnect()
+
+    def test_unknown_model_defaults_prot_p(self, ftp_client_factory):
+        """Unknown model defaults to prot_p."""
+        client = ftp_client_factory(printer_model="FuturePrinter3000")
+        assert client._is_a1_model() is False
+        assert client._should_use_prot_c() is False
+        assert client.connect() is True
+        client.disconnect()
+
+    def test_mode_cache_persists_and_clears(self, ftp_client_factory):
+        """Mode cache works within a test and clears between tests."""
+        # Cache should be empty at start (autouse fixture clears it)
+        assert BambuFTPClient._mode_cache == {}
+
+        # Connect and cache a mode
+        BambuFTPClient.cache_mode("127.0.0.1", "prot_p")
+        assert BambuFTPClient._mode_cache["127.0.0.1"] == "prot_p"
+
+        # New client for same IP uses cached mode
+        client = ftp_client_factory(printer_model="A1")
+        assert client._get_cached_mode() == "prot_p"
+        assert client._should_use_prot_c() is False
+        client.disconnect()
+
+
+# ---------------------------------------------------------------------------
+# TestAsyncWrappers
+# ---------------------------------------------------------------------------
+class TestAsyncWrappers:
+    """Tests for async wrapper functions using patch_ftp_port fixture."""
+
+    @pytest.mark.asyncio
+    async def test_upload_file_async_success(self, patch_ftp_port, tmp_path):
+        """upload_file_async succeeds for X1C."""
+        content = b"async upload"
+        local = tmp_path / "async_up.3mf"
+        local.write_bytes(content)
+        result = await upload_file_async(
+            "127.0.0.1",
+            "12345678",
+            local,
+            "/cache/async_up.3mf",
+            timeout=30.0,
+            printer_model="X1C",
+        )
+        assert result is True
+
+    @pytest.mark.asyncio
+    async def test_upload_file_async_a1_fallback(self, patch_ftp_port, tmp_path):
+        """upload_file_async tries prot_p then falls back to prot_c for A1."""
+        content = b"a1 async upload"
+        local = tmp_path / "a1_async.3mf"
+        local.write_bytes(content)
+        # For A1 models, if prot_p succeeds we get True.
+        # If prot_p fails, it tries prot_c. Either way should succeed
+        # against our mock server which accepts both.
+        result = await upload_file_async(
+            "127.0.0.1",
+            "12345678",
+            local,
+            "/cache/a1_async.3mf",
+            timeout=30.0,
+            printer_model="A1",
+        )
+        assert result is True
+
+    @pytest.mark.asyncio
+    async def test_download_file_async_success(self, patch_ftp_port, tmp_path):
+        """download_file_async succeeds."""
+        server = patch_ftp_port
+        content = b"async download content"
+        server.add_file("cache/async_dl.bin", content)
+        local = tmp_path / "async_dl.bin"
+        result = await download_file_async(
+            "127.0.0.1",
+            "12345678",
+            "/cache/async_dl.bin",
+            local,
+            timeout=30.0,
+            printer_model="X1C",
+        )
+        assert result is True
+        assert local.read_bytes() == content
+
+    @pytest.mark.asyncio
+    async def test_download_file_async_a1_fallback(self, patch_ftp_port, tmp_path):
+        """download_file_async falls back for A1 models."""
+        server = patch_ftp_port
+        server.add_file("cache/a1_dl.bin", b"a1 data")
+        local = tmp_path / "a1_dl.bin"
+        result = await download_file_async(
+            "127.0.0.1",
+            "12345678",
+            "/cache/a1_dl.bin",
+            local,
+            timeout=30.0,
+            printer_model="A1",
+        )
+        assert result is True
+
+    @pytest.mark.asyncio
+    async def test_download_file_try_paths_first_succeeds(self, patch_ftp_port, tmp_path):
+        """download_file_try_paths_async succeeds on first path."""
+        server = patch_ftp_port
+        server.add_file("cache/try1.bin", b"first path")
+        local = tmp_path / "try.bin"
+        result = await download_file_try_paths_async(
+            "127.0.0.1",
+            "12345678",
+            ["/cache/try1.bin", "/cache/try2.bin"],
+            local,
+            printer_model="X1C",
+        )
+        assert result is True
+        assert local.read_bytes() == b"first path"
+
+    @pytest.mark.asyncio
+    async def test_download_file_try_paths_fallback(self, patch_ftp_port, tmp_path):
+        """download_file_try_paths_async falls back to second path."""
+        server = patch_ftp_port
+        server.add_file("cache/second.bin", b"second path")
+        local = tmp_path / "fallback.bin"
+        result = await download_file_try_paths_async(
+            "127.0.0.1",
+            "12345678",
+            ["/cache/missing.bin", "/cache/second.bin"],
+            local,
+            printer_model="X1C",
+        )
+        assert result is True
+        assert local.read_bytes() == b"second path"
+
+    @pytest.mark.asyncio
+    async def test_list_files_async_success(self, patch_ftp_port):
+        """list_files_async returns file list."""
+        server = patch_ftp_port
+        server.add_file("cache/listed.bin", b"data")
+        result = await list_files_async(
+            "127.0.0.1",
+            "12345678",
+            "/cache",
+            timeout=30.0,
+            printer_model="X1C",
+        )
+        names = {f["name"] for f in result}
+        assert "listed.bin" in names
+
+    @pytest.mark.asyncio
+    async def test_delete_file_async_success(self, patch_ftp_port):
+        """delete_file_async deletes a file."""
+        server = patch_ftp_port
+        server.add_file("cache/to_async_del.bin", b"delete me")
+        result = await delete_file_async(
+            "127.0.0.1",
+            "12345678",
+            "/cache/to_async_del.bin",
+            printer_model="X1C",
+        )
+        assert result is True
+        assert not server.file_exists("cache/to_async_del.bin")
+
+
+# ---------------------------------------------------------------------------
+# TestFailureScenarios
+# ---------------------------------------------------------------------------
+class TestFailureScenarios:
+    """Regression tests for known FTP failure modes."""
+
+    def test_550_caught_by_broad_except(self, ftp_client_factory, ftp_server, tmp_path):
+        """550 error_perm is caught by (OSError, ftplib.Error) handler.
+
+        Regression: error_perm is a subclass of ftplib.Error, so the
+        broad except clause in upload_file catches it correctly.
+        """
+        ftp_server.inject_failure("STOR", 550, "Permission denied.")
+        local = tmp_path / "test.bin"
+        local.write_bytes(b"data")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_file(local, "/cache/test.bin")
+        assert result is False
+        client.disconnect()
+
+    def test_zero_byte_download_detected(self, ftp_client_factory, ftp_server, tmp_path):
+        """0-byte download is detected and file is cleaned up.
+
+        Regression: Prior to fix, 0-byte downloads were reported as success.
+        """
+        ftp_server.add_file("cache/zero.bin", b"")
+        local = tmp_path / "zero.bin"
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_to_file("/cache/zero.bin", local)
+        assert result is False
+        assert not local.exists()
+        client.disconnect()
+
+    def test_connection_refused_handled(self):
+        """Connection refused is handled gracefully."""
+        client = BambuFTPClient("127.0.0.1", "12345678", timeout=2.0)
+        client.FTP_PORT = 1  # Almost certainly not listening
+        assert client.connect() is False
+
+    def test_auth_failure_530(self, ftp_client_factory, ftp_server):
+        """530 authentication failure returns False."""
+        ftp_server.inject_failure("PASS", 530, "Login incorrect.")
+        client = ftp_client_factory()
+        result = client.connect()
+        assert result is False
+
+    def test_retr_550_handled(self, ftp_client_factory, ftp_server):
+        """RETR 550 (file not found) returns None."""
+        ftp_server.inject_failure("RETR", 550, "File not found.")
+        ftp_server.add_file("cache/exists.bin", b"data")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.download_file("/cache/exists.bin")
+        assert result is None
+        client.disconnect()
+
+    def test_cwd_550_handled(self, ftp_client_factory, ftp_server):
+        """CWD 550 is handled in list_files."""
+        ftp_server.inject_failure("CWD", 550, "Directory not found.")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.list_files("/nonexistent")
+        assert result == []
+        client.disconnect()
+
+    def test_stor_553_handled(self, ftp_client_factory, ftp_server, tmp_path):
+        """STOR 553 (no SD card) handled gracefully."""
+        ftp_server.inject_failure("STOR", 553, "Could not create file.")
+        local = tmp_path / "test.bin"
+        local.write_bytes(b"test")
+        client = ftp_client_factory()
+        client.connect()
+        result = client.upload_file(local, "/cache/test.bin")
+        assert result is False
+        client.disconnect()
+
+    def test_diagnose_storage_cwd_failure_doesnt_propagate(self, ftp_client_factory, ftp_server):
+        """diagnose_storage CWD failure doesn't crash the whole operation.
+
+        Regression: diagnose_storage() was called in the upload path and
+        a CWD failure would propagate and crash the upload.
+        """
+        ftp_server.inject_failure("CWD", 550, "No such directory.", count=2)
+        client = ftp_client_factory()
+        client.connect()
+        diag = client.diagnose_storage()
+        # Should still return results (with errors noted)
+        assert diag["connected"] is True
+        assert len(diag["errors"]) > 0
+        client.disconnect()
+
+    def test_failure_injection_count_decrements(self, ftp_client_factory, ftp_server):
+        """Failure injection with count decrements and eventually succeeds."""
+        ftp_server.add_file("cache/retry.bin", b"data after retry")
+        ftp_server.inject_failure("RETR", 550, "Temporary error.", count=1)
+        client = ftp_client_factory()
+        client.connect()
+        # First attempt fails
+        result1 = client.download_file("/cache/retry.bin")
+        assert result1 is None
+        # Second attempt succeeds (failure count exhausted)
+        result2 = client.download_file("/cache/retry.bin")
+        assert result2 == b"data after retry"
+        client.disconnect()

+ 3 - 0
requirements-dev.txt

@@ -6,6 +6,9 @@ pytest-xdist>=3.5.0
 httpx>=0.27.0
 httpx>=0.27.0
 ruff>=0.8.0
 ruff>=0.8.0
 
 
+# Required by pyftpdlib TLS_FTPHandler for mock FTP server tests
+pyOpenSSL>=24.0.0
+
 # Security scanning
 # Security scanning
 bandit[sarif]>=1.7.0
 bandit[sarif]>=1.7.0
 pip-audit>=2.7.0
 pip-audit>=2.7.0