mock_ftp_server.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. """Mock implicit FTPS server for testing BambuFTPClient.
  2. Built on pyftpdlib with implicit TLS support to match Bambu printer behavior.
  3. Supports failure injection, custom AVBL command, and filesystem inspection.
  4. """
  5. import logging
  6. import os
  7. import threading
  8. import time
  9. from pyftpdlib.authorizers import DummyAuthorizer
  10. from pyftpdlib.handlers import TLS_FTPHandler
  11. from pyftpdlib.servers import FTPServer
  12. class ImplicitTLS_FTPHandler(TLS_FTPHandler):
  13. """FTP handler that wraps the socket in TLS before sending the 220 banner.
  14. This implements implicit FTPS (port 990 style) where the TLS handshake
  15. happens immediately on connect, before any FTP protocol exchange.
  16. pyftpdlib only natively supports explicit FTPS (AUTH TLS after connect).
  17. """
  18. # Per-class failure injection map: command -> (code, message, remaining_count)
  19. # -1 remaining_count = permanent failure
  20. _failure_map: dict = {}
  21. # AVBL command response (bytes available)
  22. _avbl_bytes: int = 1073741824 # 1 GB default
  23. # Register AVBL as a recognized FTP command (pyftpdlib requires this)
  24. proto_cmds = {
  25. **TLS_FTPHandler.proto_cmds,
  26. "AVBL": {
  27. "perm": None,
  28. "auth": True,
  29. "arg": None,
  30. "help": "Syntax: AVBL (get available bytes).",
  31. },
  32. }
  33. def handle(self):
  34. """Wrap socket in TLS immediately, then send 220 banner."""
  35. self.secure_connection(self.get_ssl_context())
  36. super().handle()
  37. def ftp_PROT(self, line):
  38. """Override PROT to auto-set _pbsz for implicit FTPS.
  39. In implicit FTPS the connection is already TLS-secured, so requiring
  40. a separate PBSZ command is unnecessary. Python's ftplib prot_c()
  41. doesn't send PBSZ first (unlike prot_p()), causing 503 errors.
  42. Real Bambu printers don't enforce this for implicit FTPS either.
  43. """
  44. self._pbsz = True
  45. return super().ftp_PROT(line)
  46. def _check_failure(self, command: str, line: str):
  47. """Check if a failure is injected for this command.
  48. Returns True if a failure response was sent, False otherwise.
  49. """
  50. if command in self._failure_map:
  51. code, message, remaining = self._failure_map[command]
  52. if remaining != 0:
  53. if remaining > 0:
  54. self._failure_map[command] = (code, message, remaining - 1)
  55. if remaining - 1 == 0:
  56. del self._failure_map[command]
  57. self.respond(f"{code} {message}")
  58. return True
  59. return False
  60. def ftp_AVBL(self, line):
  61. """Handle custom AVBL command (available bytes on storage)."""
  62. self.respond(f"213 {self._avbl_bytes}")
  63. def ftp_RETR(self, file):
  64. if self._check_failure("RETR", file):
  65. return
  66. return super().ftp_RETR(file)
  67. def ftp_STOR(self, file):
  68. if self._check_failure("STOR", file):
  69. return
  70. return super().ftp_STOR(file)
  71. def ftp_DELE(self, line):
  72. if self._check_failure("DELE", line):
  73. return
  74. return super().ftp_DELE(line)
  75. def ftp_CWD(self, path):
  76. if self._check_failure("CWD", path):
  77. return
  78. return super().ftp_CWD(path)
  79. def ftp_LIST(self, path=""):
  80. if self._check_failure("LIST", path):
  81. return
  82. return super().ftp_LIST(path)
  83. def ftp_SIZE(self, path):
  84. if self._check_failure("SIZE", path):
  85. return
  86. # Override to allow SIZE in ASCII mode (real Bambu printers allow it,
  87. # and BambuFTPClient.get_file_size() doesn't set TYPE I first)
  88. if not self.fs.isfile(self.fs.realpath(path)):
  89. self.respond(f"550 {self.fs.fs2ftp(path)} is not retrievable.")
  90. return
  91. try:
  92. size = self.run_as_current_user(self.fs.getsize, path)
  93. except OSError as err:
  94. self.respond(f"550 {err}.")
  95. else:
  96. self.respond(f"213 {size}")
  97. def ftp_PASS(self, line):
  98. if self._check_failure("PASS", line):
  99. return
  100. return super().ftp_PASS(line)
  101. class MockBambuFTPServer:
  102. """Manages a mock implicit FTPS server in a background thread.
  103. Simulates a Bambu printer FTP server with:
  104. - Implicit TLS (like real printers on port 990)
  105. - Standard Bambu directory structure
  106. - AVBL command support
  107. - Per-command failure injection for testing error paths
  108. """
  109. def __init__(
  110. self,
  111. host: str,
  112. port: int,
  113. root_dir: str,
  114. cert_path: str,
  115. key_path: str,
  116. access_code: str = "12345678",
  117. ):
  118. self.host = host
  119. self.port = port
  120. self.root_dir = root_dir
  121. self.cert_path = cert_path
  122. self.key_path = key_path
  123. self.access_code = access_code
  124. self._server: FTPServer | None = None
  125. self._thread: threading.Thread | None = None
  126. # Create a unique handler class per instance so _failure_map is isolated
  127. self._handler_class = type(
  128. "TestFTPHandler",
  129. (ImplicitTLS_FTPHandler,),
  130. {
  131. "_failure_map": {},
  132. "_avbl_bytes": 1073741824,
  133. },
  134. )
  135. def start(self):
  136. """Start the FTP server in a background daemon thread."""
  137. authorizer = DummyAuthorizer()
  138. authorizer.add_user("bblp", self.access_code, self.root_dir, perm="elradfmwMT")
  139. handler = self._handler_class
  140. handler.authorizer = authorizer
  141. handler.certfile = self.cert_path
  142. handler.keyfile = self.key_path
  143. handler.passive_ports = range(60000, 60101)
  144. handler.tls_control_required = False
  145. handler.tls_data_required = False
  146. # Reset ssl_context so it picks up our cert/key
  147. handler.ssl_context = None
  148. # Suppress pyftpdlib's noisy logging (startup/shutdown banners)
  149. # to avoid "I/O operation on closed file" errors when xdist
  150. # workers tear down while the daemon thread is still logging.
  151. logging.getLogger("pyftpdlib").setLevel(logging.CRITICAL)
  152. self._server = FTPServer((self.host, self.port), handler)
  153. self._server.max_cons = 10
  154. self._server.max_cons_per_ip = 5
  155. self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
  156. self._thread.start()
  157. # Brief wait for server to be ready
  158. time.sleep(0.1)
  159. def stop(self):
  160. """Stop the FTP server and wait for thread to exit."""
  161. if self._server:
  162. self._server.close_all()
  163. if self._thread:
  164. self._thread.join(timeout=5)
  165. self._server = None
  166. self._thread = None
  167. def inject_failure(self, command: str, code: int, message: str, count: int = -1):
  168. """Inject a failure response for a specific FTP command.
  169. Args:
  170. command: FTP command name (RETR, STOR, DELE, CWD, LIST, SIZE, PASS)
  171. code: FTP response code (e.g. 550, 553)
  172. message: Response message
  173. count: Number of times to fail (-1 = permanent)
  174. """
  175. self._handler_class._failure_map[command] = (code, message, count)
  176. def clear_failures(self):
  177. """Remove all injected failures."""
  178. self._handler_class._failure_map.clear()
  179. def set_avbl_bytes(self, n: int):
  180. """Set the response value for the AVBL command."""
  181. self._handler_class._avbl_bytes = n
  182. def add_file(self, relative_path: str, content: bytes = b""):
  183. """Add a file to the server's filesystem."""
  184. full_path = os.path.join(self.root_dir, relative_path.lstrip("/"))
  185. os.makedirs(os.path.dirname(full_path), exist_ok=True)
  186. with open(full_path, "wb") as f:
  187. f.write(content)
  188. def add_directory(self, relative_path: str):
  189. """Create a directory in the server's filesystem."""
  190. full_path = os.path.join(self.root_dir, relative_path.lstrip("/"))
  191. os.makedirs(full_path, exist_ok=True)
  192. def file_exists(self, relative_path: str) -> bool:
  193. """Check if a file exists on the server."""
  194. full_path = os.path.join(self.root_dir, relative_path.lstrip("/"))
  195. return os.path.isfile(full_path)
  196. def read_file(self, relative_path: str) -> bytes:
  197. """Read file content from the server's filesystem."""
  198. full_path = os.path.join(self.root_dir, relative_path.lstrip("/"))
  199. with open(full_path, "rb") as f:
  200. return f.read()