|
|
@@ -55,6 +55,7 @@ class FTPSession:
|
|
|
self._data_reader: asyncio.StreamReader | None = None
|
|
|
self._data_writer: asyncio.StreamWriter | None = None
|
|
|
self._data_connected = asyncio.Event()
|
|
|
+ self._transfer_done = asyncio.Event()
|
|
|
|
|
|
peername = writer.get_extra_info("peername")
|
|
|
self.remote_ip = peername[0] if peername else "unknown"
|
|
|
@@ -118,6 +119,9 @@ class FTPSession:
|
|
|
|
|
|
async def _cleanup(self) -> None:
|
|
|
"""Clean up session resources."""
|
|
|
+ # Release any waiting data connection callback
|
|
|
+ self._transfer_done.set()
|
|
|
+
|
|
|
if self.data_server:
|
|
|
self.data_server.close()
|
|
|
try:
|
|
|
@@ -233,10 +237,11 @@ class FTPSession:
|
|
|
# Close any existing data connection/server
|
|
|
await self._close_data_connection()
|
|
|
|
|
|
- # Reset connection state
|
|
|
+ # Reset connection state for the new transfer
|
|
|
self._data_connected.clear()
|
|
|
self._data_reader = None
|
|
|
self._data_writer = None
|
|
|
+ self._transfer_done = asyncio.Event()
|
|
|
|
|
|
if await self._bind_passive_port():
|
|
|
# EPSV response format: 229 Entering Extended Passive Mode (|||port|)
|
|
|
@@ -255,10 +260,11 @@ class FTPSession:
|
|
|
# Close any existing data connection/server
|
|
|
await self._close_data_connection()
|
|
|
|
|
|
- # Reset connection state
|
|
|
+ # Reset connection state for the new transfer
|
|
|
self._data_connected.clear()
|
|
|
self._data_reader = None
|
|
|
self._data_writer = None
|
|
|
+ self._transfer_done = asyncio.Event()
|
|
|
|
|
|
if await self._bind_passive_port():
|
|
|
# Determine the IP to advertise in PASV response
|
|
|
@@ -288,7 +294,24 @@ class FTPSession:
|
|
|
await self.send(425, "Cannot open data connection")
|
|
|
|
|
|
async def _handle_data_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
|
|
- """Handle incoming data connection (used by PASV)."""
|
|
|
+ """Handle incoming data connection (used by PASV/EPSV).
|
|
|
+
|
|
|
+ This callback stays alive until the transfer completes to ensure the
|
|
|
+ asyncio task holds strong references to the reader/writer throughout
|
|
|
+ the data transfer. If the callback returned immediately, the task
|
|
|
+ would complete and the StreamReaderProtocol could release its strong
|
|
|
+ reader reference, potentially destabilising the connection.
|
|
|
+ """
|
|
|
+ # Reject duplicate connections — only one data connection per transfer
|
|
|
+ if self._data_reader is not None:
|
|
|
+ logger.warning("FTP rejecting duplicate data connection from %s", self.remote_ip)
|
|
|
+ try:
|
|
|
+ writer.close()
|
|
|
+ await writer.wait_closed()
|
|
|
+ except OSError:
|
|
|
+ pass
|
|
|
+ return
|
|
|
+
|
|
|
# Log TLS details for debugging
|
|
|
ssl_obj = writer.get_extra_info("ssl_object")
|
|
|
if ssl_obj:
|
|
|
@@ -302,13 +325,26 @@ class FTPSession:
|
|
|
logger.info("FTP data connection established from %s", self.remote_ip)
|
|
|
self._data_reader = reader
|
|
|
self._data_writer = writer
|
|
|
+
|
|
|
+ # Stop accepting further connections on the passive port
|
|
|
+ if self.data_server:
|
|
|
+ self.data_server.close()
|
|
|
+
|
|
|
self._data_connected.set()
|
|
|
- # Don't close - let the transfer command handle it
|
|
|
+
|
|
|
+ # Keep this callback alive until the transfer command (STOR/RETR)
|
|
|
+ # finishes. This ensures the asyncio server-handler task holds strong
|
|
|
+ # references to reader/writer for the entire transfer lifetime.
|
|
|
+ await self._transfer_done.wait()
|
|
|
|
|
|
async def _close_data_connection(self) -> None:
|
|
|
"""Close the data connection and server."""
|
|
|
had_connection = self._data_writer is not None or self.data_server is not None
|
|
|
|
|
|
+ # Signal the _handle_data_connection callback to return, allowing
|
|
|
+ # its asyncio task to complete cleanly.
|
|
|
+ self._transfer_done.set()
|
|
|
+
|
|
|
if self._data_writer:
|
|
|
try:
|
|
|
self._data_writer.close()
|
|
|
@@ -336,7 +372,7 @@ class FTPSession:
|
|
|
await self.send(530, "Not logged in")
|
|
|
return
|
|
|
|
|
|
- if not self.data_server:
|
|
|
+ if not self.data_server and not self._data_connected.is_set():
|
|
|
await self.send(425, "Use PASV first")
|
|
|
return
|
|
|
|
|
|
@@ -363,20 +399,28 @@ class FTPSession:
|
|
|
|
|
|
# Receive data
|
|
|
data_content: list[bytes] = []
|
|
|
+ total_received = 0
|
|
|
try:
|
|
|
while True:
|
|
|
chunk = await asyncio.wait_for(self._data_reader.read(65536), timeout=60)
|
|
|
if not chunk:
|
|
|
break
|
|
|
data_content.append(chunk)
|
|
|
- logger.debug("FTP received chunk: %s bytes", len(chunk))
|
|
|
+ total_received += len(chunk)
|
|
|
+ logger.debug("FTP received chunk: %s bytes (total: %s)", len(chunk), total_received)
|
|
|
except TimeoutError:
|
|
|
- logger.error("FTP data transfer timeout")
|
|
|
+ logger.error("FTP data transfer timeout after %s bytes for %s", total_received, filename)
|
|
|
await self.send(426, "Transfer timeout")
|
|
|
await self._close_data_connection()
|
|
|
return
|
|
|
except Exception as e:
|
|
|
- logger.error("FTP data transfer error: %s", e)
|
|
|
+ logger.error(
|
|
|
+ "FTP data transfer error after %s bytes for %s: %s(%s)",
|
|
|
+ total_received,
|
|
|
+ filename,
|
|
|
+ type(e).__name__,
|
|
|
+ e,
|
|
|
+ )
|
|
|
await self.send(426, f"Transfer failed: {e}")
|
|
|
await self._close_data_connection()
|
|
|
return
|