| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403 |
- """Comprehensive FTP test suite for BambuFTPClient.
- Tests against a real mock implicit FTPS server, covering:
- - Connection (auth, SSL modes, timeout, caching)
- - File listing
- - Download (bytes, to_file, 0-byte regression)
- - Upload (chunked transfer, progress, error codes)
- - Delete
- - File size
- - Storage info (AVBL, directory scan, diagnose_storage)
- - Model-specific behavior (X1C prot_p, A1 prot_c fallback)
- - Async wrappers
- - Failure injection scenarios (regressions for 0.1.8 bugs)
- """
- import time
- from pathlib import Path
- import pytest
- from backend.app.services.bambu_ftp import (
- BambuFTPClient,
- FileNotOnPrinterError,
- cache_3mf_download,
- clear_3mf_cache,
- delete_file_async,
- download_file_async,
- download_file_try_paths_async,
- get_cached_3mf,
- list_files_async,
- normalize_3mf_name,
- upload_file_async,
- with_ftp_retry,
- )
- # Brief delay to allow pyftpdlib to flush uploaded files to disk.
- # Needed because upload_file() skips voidresp() for all models,
- # so the server may still be processing the data channel close event.
- _UPLOAD_FLUSH_DELAY = 0.3
- # ---------------------------------------------------------------------------
- # TestConnection
- # ---------------------------------------------------------------------------
- class TestConnection:
- """Tests for FTP connect/disconnect behavior."""
- def test_connect_success(self, ftp_client_factory):
- """Successful implicit FTPS connection and login."""
- client = ftp_client_factory()
- assert client.connect() is True
- client.disconnect()
- def test_connect_wrong_access_code(self, ftp_client_factory):
- """Wrong access code returns False."""
- client = ftp_client_factory(access_code="wrongcode")
- assert client.connect() is False
- def test_connect_unreachable_host(self, ftp_server):
- """Unreachable host returns False."""
- client = BambuFTPClient(
- ip_address="192.0.2.1", # TEST-NET, guaranteed unreachable
- access_code="12345678",
- timeout=1.0,
- printer_model="X1C",
- )
- client.FTP_PORT = ftp_server.port
- assert client.connect() is False
- def test_connect_timeout(self, ftp_server):
- """Very short timeout triggers timeout error."""
- client = BambuFTPClient(
- ip_address="192.0.2.1",
- access_code="12345678",
- timeout=0.001, # Extremely short
- printer_model="X1C",
- )
- client.FTP_PORT = ftp_server.port
- assert client.connect() is False
- def test_disconnect_clean(self, ftp_client_factory):
- """Clean disconnect after successful connect."""
- client = ftp_client_factory()
- client.connect()
- client.disconnect()
- assert client._ftp is None
- def test_disconnect_without_connect(self, ftp_client_factory):
- """Disconnect without connect does not raise."""
- client = ftp_client_factory()
- client.disconnect() # Should not raise
- assert client._ftp is None
- def test_x1c_uses_prot_p(self, ftp_client_factory):
- """X1C model connects with prot_p (protected data channel)."""
- client = ftp_client_factory(printer_model="X1C")
- assert client.connect() is True
- assert client._should_use_prot_c() is False
- client.disconnect()
- def test_a1_defaults_prot_p(self, ftp_client_factory):
- """A1 model defaults to prot_p when no cache exists."""
- client = ftp_client_factory(printer_model="A1")
- assert client._should_use_prot_c() is False
- assert client.connect() is True
- client.disconnect()
- def test_a1_force_prot_c(self, ftp_client_factory):
- """A1 model with force_prot_c uses clear data channel."""
- client = ftp_client_factory(printer_model="A1", force_prot_c=True)
- assert client._should_use_prot_c() is True
- assert client.connect() is True
- client.disconnect()
- def test_cached_mode_respected(self, ftp_client_factory):
- """Cached mode is used on subsequent connections."""
- BambuFTPClient.cache_mode("127.0.0.1", "prot_c")
- client = ftp_client_factory(printer_model="A1")
- assert client._should_use_prot_c() is True
- assert client.connect() is True
- client.disconnect()
- # ---------------------------------------------------------------------------
- # TestDisconnectServerGone — isolated class because server.stop() calls
- # close_all() which nukes all asyncore sockets globally.
- # ---------------------------------------------------------------------------
- class TestDisconnectServerGone:
- """Test disconnect behavior when the server has stopped."""
- def test_disconnect_after_server_gone(self, ftp_certs, tmp_path):
- """Disconnect after server has stopped does not raise.
- disconnect() catches OSError, ftplib.Error, and EOFError so that
- best-effort cleanup never propagates exceptions to the caller.
- """
- from backend.tests.unit.services.mock_ftp_server import (
- MockBambuFTPServer,
- )
- from .conftest import _find_free_port
- cert_path, key_path = ftp_certs
- port = _find_free_port()
- server = MockBambuFTPServer("127.0.0.1", port, str(tmp_path), cert_path, key_path)
- server.start()
- client = BambuFTPClient("127.0.0.1", "12345678", timeout=5.0)
- client.FTP_PORT = port
- client.connect()
- server.stop()
- # Should not raise — disconnect() catches all connection errors
- client.disconnect()
- assert client._ftp is None
- # ---------------------------------------------------------------------------
- # TestListFiles
- # ---------------------------------------------------------------------------
- class TestListFiles:
- """Tests for directory listing."""
- def test_list_empty_directory(self, ftp_client_factory):
- """Listing an empty directory returns empty list."""
- client = ftp_client_factory()
- client.connect()
- files = client.list_files("/cache")
- assert files == []
- client.disconnect()
- def test_list_directory_with_files(self, ftp_client_factory, ftp_server):
- """Files in directory are listed correctly."""
- ftp_server.add_file("cache/test.3mf", b"x" * 1024)
- ftp_server.add_file("cache/test2.gcode", b"y" * 512)
- client = ftp_client_factory()
- client.connect()
- files = client.list_files("/cache")
- names = {f["name"] for f in files}
- assert "test.3mf" in names
- assert "test2.gcode" in names
- client.disconnect()
- def test_directories_marked(self, ftp_client_factory, ftp_server):
- """Subdirectories are identified with is_directory=True."""
- ftp_server.add_directory("model/subdir")
- client = ftp_client_factory()
- client.connect()
- files = client.list_files("/model")
- dirs = [f for f in files if f["is_directory"]]
- assert len(dirs) >= 1
- assert dirs[0]["name"] == "subdir"
- client.disconnect()
- def test_nonexistent_path_returns_empty(self, ftp_client_factory):
- """Listing a nonexistent path returns empty list."""
- client = ftp_client_factory()
- client.connect()
- files = client.list_files("/nonexistent/path")
- assert files == []
- client.disconnect()
- def test_file_sizes_and_paths(self, ftp_client_factory, ftp_server):
- """File sizes and full paths are parsed correctly."""
- ftp_server.add_file("cache/sized.bin", b"a" * 2048)
- client = ftp_client_factory()
- client.connect()
- files = client.list_files("/cache")
- sized = [f for f in files if f["name"] == "sized.bin"]
- assert len(sized) == 1
- assert sized[0]["size"] == 2048
- assert sized[0]["path"] == "/cache/sized.bin"
- client.disconnect()
- # ---------------------------------------------------------------------------
- # TestDownload
- # ---------------------------------------------------------------------------
- class TestDownload:
- """Tests for file download operations."""
- def test_download_file_returns_bytes(self, ftp_client_factory, ftp_server):
- """download_file() returns file content as bytes."""
- content = b"Hello FTP World!"
- ftp_server.add_file("cache/hello.txt", content)
- client = ftp_client_factory()
- client.connect()
- result = client.download_file("/cache/hello.txt")
- assert result == content
- client.disconnect()
- def test_download_file_missing(self, ftp_client_factory):
- """download_file() returns None for missing file."""
- client = ftp_client_factory()
- client.connect()
- result = client.download_file("/cache/does_not_exist.txt")
- assert result is None
- client.disconnect()
- def test_download_to_file_writes_to_disk(self, ftp_client_factory, ftp_server, tmp_path):
- """download_to_file() writes content to local filesystem."""
- content = b"Downloaded content"
- ftp_server.add_file("cache/dl.bin", content)
- local = tmp_path / "output" / "dl.bin"
- client = ftp_client_factory()
- client.connect()
- result = client.download_to_file("/cache/dl.bin", local)
- assert result is True
- assert local.read_bytes() == content
- client.disconnect()
- def test_download_to_file_creates_parent_dirs(self, ftp_client_factory, ftp_server, tmp_path):
- """download_to_file() creates parent directories automatically."""
- ftp_server.add_file("cache/nested.txt", b"nested content")
- local = tmp_path / "deep" / "nested" / "path" / "nested.txt"
- client = ftp_client_factory()
- client.connect()
- result = client.download_to_file("/cache/nested.txt", local)
- assert result is True
- assert local.exists()
- client.disconnect()
- def test_zero_byte_download_returns_false(self, ftp_client_factory, ftp_server, tmp_path):
- """0-byte download returns False and cleans up (regression test)."""
- ftp_server.add_file("cache/empty.bin", b"")
- local = tmp_path / "empty.bin"
- client = ftp_client_factory()
- client.connect()
- result = client.download_to_file("/cache/empty.bin", local)
- assert result is False
- assert not local.exists()
- client.disconnect()
- def test_download_to_file_missing_raises_not_on_printer(self, ftp_client_factory, tmp_path):
- """Missing file raises FileNotOnPrinterError so callers can short-circuit
- the retry loop — 550 means the file isn't there and retrying won't help."""
- from backend.app.services.bambu_ftp import FileNotOnPrinterError
- local = tmp_path / "missing.bin"
- client = ftp_client_factory()
- client.connect()
- try:
- with pytest.raises(FileNotOnPrinterError):
- client.download_to_file("/cache/no_such_file.bin", local)
- finally:
- client.disconnect()
- def test_download_large_file(self, ftp_client_factory, ftp_server):
- """Large file download (>1MB) works correctly."""
- large_content = b"X" * (1024 * 1024 + 500) # ~1MB + 500 bytes
- ftp_server.add_file("cache/large.bin", large_content)
- client = ftp_client_factory()
- client.connect()
- result = client.download_file("/cache/large.bin")
- assert result == large_content
- client.disconnect()
- def test_download_not_connected(self):
- """download_file() returns None when not connected."""
- client = BambuFTPClient("127.0.0.1", "12345678")
- assert client.download_file("/cache/test.bin") is None
- # ---------------------------------------------------------------------------
- # TestUpload
- # ---------------------------------------------------------------------------
- class TestUpload:
- """Tests for file upload operations."""
- def test_upload_success(self, ftp_client_factory, ftp_server, tmp_path):
- """Successful upload via transfercmd (not storbinary)."""
- content = b"Upload test content"
- local = tmp_path / "upload.3mf"
- local.write_bytes(content)
- client = ftp_client_factory()
- client.connect()
- result = client.upload_file(local, "/cache/upload.3mf")
- assert result is True
- client.disconnect()
- # Verify via fresh connection (upload_file skips voidresp() for all
- # models, so the original session can't be reused for download)
- time.sleep(_UPLOAD_FLUSH_DELAY)
- client2 = ftp_client_factory()
- client2.connect()
- downloaded = client2.download_file("/cache/upload.3mf")
- assert downloaded == content
- client2.disconnect()
- def test_upload_progress_callback(self, ftp_client_factory, ftp_server, tmp_path):
- """Progress callback receives updates during upload."""
- content = b"P" * 2048
- local = tmp_path / "progress.bin"
- local.write_bytes(content)
- progress_calls = []
- def on_progress(uploaded, total):
- progress_calls.append((uploaded, total))
- client = ftp_client_factory()
- client.connect()
- client.upload_file(local, "/cache/progress.bin", on_progress)
- assert len(progress_calls) >= 1
- # Last call should report full file uploaded
- assert progress_calls[-1][0] == len(content)
- assert progress_calls[-1][1] == len(content)
- client.disconnect()
- def test_upload_not_connected(self, tmp_path):
- """Upload when not connected returns False."""
- local = tmp_path / "test.bin"
- local.write_bytes(b"data")
- client = BambuFTPClient("127.0.0.1", "12345678")
- assert client.upload_file(local, "/cache/test.bin") is False
- def test_upload_553_no_sd_card(self, ftp_client_factory, ftp_server, tmp_path):
- """553 error (no SD card) returns False."""
- ftp_server.inject_failure("STOR", 553, "Could not create file.")
- local = tmp_path / "test.bin"
- local.write_bytes(b"data")
- client = ftp_client_factory()
- client.connect()
- result = client.upload_file(local, "/cache/test.bin")
- assert result is False
- client.disconnect()
- def test_upload_550_permission_denied(self, ftp_client_factory, ftp_server, tmp_path):
- """550 error (permission denied) returns False."""
- ftp_server.inject_failure("STOR", 550, "Permission denied.")
- local = tmp_path / "test.bin"
- local.write_bytes(b"data")
- client = ftp_client_factory()
- client.connect()
- result = client.upload_file(local, "/cache/test.bin")
- assert result is False
- client.disconnect()
- def test_upload_552_storage_full(self, ftp_client_factory, ftp_server, tmp_path):
- """552 error (storage full) returns False."""
- ftp_server.inject_failure("STOR", 552, "Storage quota exceeded.")
- local = tmp_path / "test.bin"
- local.write_bytes(b"data")
- client = ftp_client_factory()
- client.connect()
- result = client.upload_file(local, "/cache/test.bin")
- assert result is False
- client.disconnect()
- def test_upload_426_with_intact_file_proceeds(self, ftp_client_factory, ftp_server, tmp_path):
- """Some P2S firmware revisions return 426 on voidresp() even when the
- file landed fully (TLS data-channel close races the 226). #1417
- follow-up — verify via SIZE: when server size matches, proceed with
- a warning instead of failing the dispatch.
- Pre-#1417 the catch raised unconditionally and the reporter saw 11
- retries fail in a row even though every upload was actually
- succeeding on the printer side (v0.2.4.1 worked because the prior
- proceed-with-warning branch tolerated the noise).
- """
- import ftplib # nosec B402 — tests need the real ftplib to construct mock 426 responses
- local = tmp_path / "test.bin"
- local.write_bytes(b"data" * 256) # 1024 bytes
- client = ftp_client_factory()
- client.connect()
- def raise_426():
- raise ftplib.error_temp("426 Failure reading network stream.")
- def fake_size(_path):
- # Real P2S firmware: voidresp returns 426 but the file IS on
- # the SD card at its full size. Mock can't reproduce both
- # halves naturally because pyftpdlib only flushes on a clean
- # voidresp, so we inject SIZE explicitly to model the
- # printer-side state the user observes.
- return 1024
- client._ftp.voidresp = raise_426
- client._ftp.size = fake_size
- result = client.upload_file(local, "/cache/test.bin")
- assert result is True, "intact file (SIZE match) tolerates 426 noise"
- client.disconnect()
- def test_upload_426_with_truncated_file_returns_false(self, ftp_client_factory, ftp_server, tmp_path):
- """The original #1401 fix is preserved: when SIZE confirms the file
- isn't on the server at full size (or SIZE itself fails), the upload
- must fail so the dispatcher doesn't send a print command for a
- partial 3MF."""
- import ftplib # nosec B402 — tests need the real ftplib to construct mock 426 responses
- local = tmp_path / "test.bin"
- local.write_bytes(b"data" * 256)
- client = ftp_client_factory()
- client.connect()
- def raise_426():
- raise ftplib.error_temp("426 Failure reading network stream.")
- # Make SIZE report a smaller value — file is genuinely truncated.
- def fake_size(_path):
- return 100
- client._ftp.voidresp = raise_426
- client._ftp.size = fake_size
- result = client.upload_file(local, "/cache/test.bin")
- assert result is False, "truncated file (SIZE mismatch) must fail"
- client.disconnect()
- def test_upload_426_with_size_check_failing_returns_false(self, ftp_client_factory, ftp_server, tmp_path):
- """If SIZE itself fails (e.g. server too broken to answer), assume
- the worst and fail — better a retry than a print on a partial file.
- """
- import ftplib # nosec B402 — tests need the real ftplib to construct mock 426 responses
- local = tmp_path / "test.bin"
- local.write_bytes(b"data" * 256)
- client = ftp_client_factory()
- client.connect()
- def raise_426():
- raise ftplib.error_temp("426 Failure reading network stream.")
- def raise_size(_path):
- raise ftplib.error_perm("550 File not found.")
- client._ftp.voidresp = raise_426
- client._ftp.size = raise_size
- result = client.upload_file(local, "/cache/test.bin")
- assert result is False
- client.disconnect()
- def test_upload_bytes_426_with_intact_file_proceeds(self, ftp_client_factory, ftp_server):
- """upload_bytes() mirrors the same SIZE-verify logic as upload_file."""
- import ftplib # nosec B402 — tests need the real ftplib to construct mock 426 responses
- client = ftp_client_factory()
- client.connect()
- data = b"x" * 1024
- def raise_426():
- raise ftplib.error_temp("426 Failure reading network stream.")
- def fake_size(_path):
- return 1024 # printer-side file matches expected size
- client._ftp.voidresp = raise_426
- client._ftp.size = fake_size
- result = client.upload_bytes(data, "/cache/bytes.bin")
- assert result is True
- client.disconnect()
- def test_upload_bytes_426_with_truncated_file_returns_false(self, ftp_client_factory, ftp_server):
- """The truncated branch for upload_bytes()."""
- import ftplib # nosec B402 — tests need the real ftplib to construct mock 426 responses
- client = ftp_client_factory()
- client.connect()
- data = b"x" * 1024
- def raise_426():
- raise ftplib.error_temp("426 Failure reading network stream.")
- def fake_size(_path):
- return 100
- client._ftp.voidresp = raise_426
- client._ftp.size = fake_size
- result = client.upload_bytes(data, "/cache/bytes.bin")
- assert result is False
- client.disconnect()
- def test_upload_bytes_success(self, ftp_client_factory, ftp_server):
- """upload_bytes() writes data to server."""
- data = b"Bytes upload content"
- client = ftp_client_factory()
- client.connect()
- result = client.upload_bytes(data, "/cache/bytes.bin")
- assert result is True
- client.disconnect()
- # Verify via fresh connection
- time.sleep(_UPLOAD_FLUSH_DELAY)
- client2 = ftp_client_factory()
- client2.connect()
- downloaded = client2.download_file("/cache/bytes.bin")
- assert downloaded == data
- client2.disconnect()
- def test_upload_bytes_failure(self, ftp_client_factory, ftp_server):
- """upload_bytes() returns False on STOR failure."""
- ftp_server.inject_failure("STOR", 553, "No space.")
- client = ftp_client_factory()
- client.connect()
- result = client.upload_bytes(b"data", "/cache/fail.bin")
- assert result is False
- client.disconnect()
- def test_upload_large_chunked(self, ftp_client_factory, ftp_server, tmp_path):
- """Large file upload in chunks completes without error.
- Uses 2.5MB to trigger multiple chunks with 64KB CHUNK_SIZE.
- Content verification skipped because upload_file() skips
- voidresp() for all models, so the server may still be flushing
- when we check. The upload result=True confirms the client sent
- all chunks without error.
- """
- content = b"C" * (1024 * 1024 * 2 + 512 * 1024)
- local = tmp_path / "large.bin"
- local.write_bytes(content)
- progress_calls = []
- def on_progress(uploaded, total):
- progress_calls.append((uploaded, total))
- client = ftp_client_factory()
- client.connect()
- result = client.upload_file(local, "/cache/large.bin", on_progress)
- assert result is True
- # Verify many chunks were sent (2.5MB / 64KB = 40 chunks)
- assert len(progress_calls) >= 38
- assert progress_calls[-1][0] == len(content)
- client.disconnect()
- # ---------------------------------------------------------------------------
- # TestDelete
- # ---------------------------------------------------------------------------
- class TestDelete:
- """Tests for file deletion."""
- def test_delete_success(self, ftp_client_factory, ftp_server):
- """Successful file deletion."""
- ftp_server.add_file("cache/to_delete.bin", b"delete me")
- client = ftp_client_factory()
- client.connect()
- result = client.delete_file("/cache/to_delete.bin")
- assert result is True
- assert not ftp_server.file_exists("cache/to_delete.bin")
- client.disconnect()
- def test_delete_not_found(self, ftp_client_factory):
- """Deleting a nonexistent file returns False."""
- client = ftp_client_factory()
- client.connect()
- result = client.delete_file("/cache/no_such_file.bin")
- assert result is False
- client.disconnect()
- def test_delete_not_connected(self):
- """Delete when not connected returns False."""
- client = BambuFTPClient("127.0.0.1", "12345678")
- assert client.delete_file("/cache/test.bin") is False
- # ---------------------------------------------------------------------------
- # TestFileSize
- # ---------------------------------------------------------------------------
- class TestFileSize:
- """Tests for get_file_size."""
- def test_file_size_correct(self, ftp_client_factory, ftp_server):
- """Returns correct file size."""
- ftp_server.add_file("cache/sized.bin", b"a" * 4096)
- client = ftp_client_factory()
- client.connect()
- size = client.get_file_size("/cache/sized.bin")
- assert size == 4096
- client.disconnect()
- def test_file_size_missing(self, ftp_client_factory):
- """Returns None for missing file."""
- client = ftp_client_factory()
- client.connect()
- size = client.get_file_size("/cache/no_file.bin")
- assert size is None
- client.disconnect()
- def test_file_size_not_connected(self):
- """Returns None when not connected."""
- client = BambuFTPClient("127.0.0.1", "12345678")
- assert client.get_file_size("/cache/test.bin") is None
- # ---------------------------------------------------------------------------
- # TestStorageInfo
- # ---------------------------------------------------------------------------
- class TestStorageInfo:
- """Tests for storage info and diagnostics."""
- def test_avbl_parsed(self, ftp_client_factory, ftp_server):
- """AVBL response is parsed for free_bytes."""
- ftp_server.set_avbl_bytes(5000000000)
- client = ftp_client_factory()
- client.connect()
- info = client.get_storage_info()
- assert info is not None
- assert info["free_bytes"] == 5000000000
- client.disconnect()
- def test_used_bytes_from_scan(self, ftp_client_factory, ftp_server):
- """used_bytes calculated from directory scan."""
- ftp_server.add_file("cache/file1.bin", b"a" * 1000)
- ftp_server.add_file("cache/file2.bin", b"b" * 2000)
- client = ftp_client_factory()
- client.connect()
- info = client.get_storage_info()
- assert info is not None
- assert info["used_bytes"] >= 3000 # At least these two files
- client.disconnect()
- def test_storage_info_not_connected(self):
- """Returns None when not connected."""
- client = BambuFTPClient("127.0.0.1", "12345678")
- assert client.get_storage_info() is None
- def test_diagnose_storage_success(self, ftp_client_factory, ftp_server):
- """diagnose_storage() returns connected=True with working diagnostics."""
- client = ftp_client_factory()
- client.connect()
- diag = client.diagnose_storage()
- assert diag["connected"] is True
- assert diag["can_list_root"] is True
- assert diag["can_list_cache"] is True
- assert diag["pwd"] is not None
- assert diag["storage_info"] is not None
- client.disconnect()
- def test_diagnose_storage_not_connected(self):
- """diagnose_storage() reports not connected."""
- client = BambuFTPClient("127.0.0.1", "12345678")
- diag = client.diagnose_storage()
- assert diag["connected"] is False
- assert "FTP not connected" in diag["errors"]
- # ---------------------------------------------------------------------------
- # TestModelSpecificBehavior
- # ---------------------------------------------------------------------------
- class TestModelSpecificBehavior:
- """Tests for printer model-specific FTP behavior."""
- def test_x1c_upload(self, ftp_client_factory, ftp_server, tmp_path):
- """X1C upload with session reuse succeeds."""
- content = b"X1C upload data"
- local = tmp_path / "x1c.3mf"
- local.write_bytes(content)
- client = ftp_client_factory(printer_model="X1C")
- client.connect()
- result = client.upload_file(local, "/cache/x1c.3mf")
- assert result is True
- client.disconnect()
- # Verify via fresh connection
- time.sleep(_UPLOAD_FLUSH_DELAY)
- client2 = ftp_client_factory(printer_model="X1C")
- client2.connect()
- downloaded = client2.download_file("/cache/x1c.3mf")
- assert downloaded == content
- client2.disconnect()
- def test_a1_upload_prot_c(self, ftp_client_factory, ftp_server, tmp_path):
- """A1 model upload with prot_c succeeds."""
- content = b"A1 upload data"
- local = tmp_path / "a1.3mf"
- local.write_bytes(content)
- client = ftp_client_factory(printer_model="A1", force_prot_c=True)
- client.connect()
- result = client.upload_file(local, "/cache/a1.3mf")
- assert result is True
- client.disconnect()
- # Verify via fresh connection
- time.sleep(_UPLOAD_FLUSH_DELAY)
- client2 = ftp_client_factory(printer_model="A1", force_prot_c=True)
- client2.connect()
- downloaded = client2.download_file("/cache/a1.3mf")
- assert downloaded == content
- client2.disconnect()
- def test_a1_mini_upload(self, ftp_client_factory, ftp_server, tmp_path):
- """A1 Mini model upload succeeds."""
- content = b"A1 Mini data"
- local = tmp_path / "a1mini.3mf"
- local.write_bytes(content)
- client = ftp_client_factory(printer_model="A1 Mini", force_prot_c=True)
- client.connect()
- result = client.upload_file(local, "/cache/a1mini.3mf")
- assert result is True
- client.disconnect()
- def test_p1s_upload(self, ftp_client_factory, ftp_server, tmp_path):
- """P1S model upload with session reuse succeeds."""
- content = b"P1S upload data"
- local = tmp_path / "p1s.3mf"
- local.write_bytes(content)
- client = ftp_client_factory(printer_model="P1S")
- client.connect()
- result = client.upload_file(local, "/cache/p1s.3mf")
- assert result is True
- client.disconnect()
- def test_unknown_model_defaults_prot_p(self, ftp_client_factory):
- """Unknown model defaults to prot_p."""
- client = ftp_client_factory(printer_model="FuturePrinter3000")
- assert client._is_a1_model() is False
- assert client._should_use_prot_c() is False
- assert client.connect() is True
- client.disconnect()
- def test_mode_cache_persists_and_clears(self, ftp_client_factory):
- """Mode cache works within a test and clears between tests."""
- # Cache should be empty at start (autouse fixture clears it)
- assert BambuFTPClient._mode_cache == {}
- # Connect and cache a mode
- BambuFTPClient.cache_mode("127.0.0.1", "prot_p")
- assert BambuFTPClient._mode_cache["127.0.0.1"] == "prot_p"
- # New client for same IP uses cached mode
- client = ftp_client_factory(printer_model="A1")
- assert client._get_cached_mode() == "prot_p"
- assert client._should_use_prot_c() is False
- client.disconnect()
- # ---------------------------------------------------------------------------
- # TestAsyncWrappers
- # ---------------------------------------------------------------------------
- class TestAsyncWrappers:
- """Tests for async wrapper functions using patch_ftp_port fixture."""
- @pytest.mark.asyncio
- async def test_upload_file_async_success(self, patch_ftp_port, tmp_path):
- """upload_file_async succeeds for X1C."""
- content = b"async upload"
- local = tmp_path / "async_up.3mf"
- local.write_bytes(content)
- result = await upload_file_async(
- "127.0.0.1",
- "12345678",
- local,
- "/cache/async_up.3mf",
- timeout=30.0,
- printer_model="X1C",
- )
- assert result is True
- @pytest.mark.asyncio
- async def test_upload_file_async_a1_fallback(self, patch_ftp_port, tmp_path):
- """upload_file_async tries prot_p then falls back to prot_c for A1."""
- content = b"a1 async upload"
- local = tmp_path / "a1_async.3mf"
- local.write_bytes(content)
- # For A1 models, if prot_p succeeds we get True.
- # If prot_p fails, it tries prot_c. Either way should succeed
- # against our mock server which accepts both.
- result = await upload_file_async(
- "127.0.0.1",
- "12345678",
- local,
- "/cache/a1_async.3mf",
- timeout=30.0,
- printer_model="A1",
- )
- assert result is True
- @pytest.mark.asyncio
- async def test_download_file_async_success(self, patch_ftp_port, tmp_path):
- """download_file_async succeeds."""
- server = patch_ftp_port
- content = b"async download content"
- server.add_file("cache/async_dl.bin", content)
- local = tmp_path / "async_dl.bin"
- result = await download_file_async(
- "127.0.0.1",
- "12345678",
- "/cache/async_dl.bin",
- local,
- timeout=30.0,
- printer_model="X1C",
- )
- assert result is True
- assert local.read_bytes() == content
- @pytest.mark.asyncio
- async def test_download_file_async_a1_fallback(self, patch_ftp_port, tmp_path):
- """download_file_async falls back for A1 models."""
- server = patch_ftp_port
- server.add_file("cache/a1_dl.bin", b"a1 data")
- local = tmp_path / "a1_dl.bin"
- result = await download_file_async(
- "127.0.0.1",
- "12345678",
- "/cache/a1_dl.bin",
- local,
- timeout=30.0,
- printer_model="A1",
- )
- assert result is True
- @pytest.mark.asyncio
- async def test_download_file_async_timeout_salvages_completed_zombie(self, tmp_path, monkeypatch):
- """Executor thread that completes after wait_for timeout is salvaged.
- asyncio.wait_for cannot cancel run_in_executor threads, so the FTP
- download may still complete after we give up waiting. If the thread
- genuinely finished (signalled via completion["success"] and the file
- is on disk), download_file_async should return True rather than False.
- Regression for #972: A1 user with 14 MB 3MF hit the hardcoded 60s
- timeout, but the download thread finished ~45s later. The successful
- file was written to disk but the async wrapper returned False, so the
- archive was created as a fallback with no 3MF data.
- """
- from backend.app.services import bambu_ftp
- # Clear mode cache so prot_p path is exercised.
- bambu_ftp.BambuFTPClient._mode_cache.pop("127.0.0.1", None)
- local = tmp_path / "zombie.bin"
- expected_content = b"late arrival but complete"
- class FakeClient:
- """Connects instantly, download_to_file sleeps past wait_for's
- timeout then writes the file and returns True."""
- def __init__(self, *args, **kwargs):
- pass
- def connect(self):
- return True
- def download_to_file(self, remote_path, local_path):
- time.sleep(0.4) # longer than wait_for timeout=0.1
- local_path.write_bytes(expected_content)
- return True
- def disconnect(self):
- pass
- monkeypatch.setattr(bambu_ftp, "BambuFTPClient", FakeClient)
- monkeypatch.setattr(FakeClient, "_mode_cache", {}, raising=False)
- monkeypatch.setattr(FakeClient, "A1_MODELS", {"A1"}, raising=False)
- def _noop_cache(ip, mode):
- pass
- monkeypatch.setattr(FakeClient, "cache_mode", staticmethod(_noop_cache), raising=False)
- result = await download_file_async(
- "127.0.0.1",
- "12345678",
- "/cache/zombie.bin",
- local,
- timeout=0.1,
- printer_model="X1C",
- )
- assert result is True
- assert local.read_bytes() == expected_content
- @pytest.mark.asyncio
- async def test_download_file_async_timeout_no_salvage_when_incomplete(self, tmp_path, monkeypatch):
- """Timeout returns False when thread has not signalled success.
- A partial file on disk (mid-retrbinary) must NOT be mistaken for a
- completed download — only the thread's explicit success flag permits
- salvage.
- """
- from backend.app.services import bambu_ftp
- bambu_ftp.BambuFTPClient._mode_cache.pop("127.0.0.1", None)
- local = tmp_path / "partial.bin"
- class FakeClient:
- def __init__(self, *args, **kwargs):
- pass
- def connect(self):
- return True
- def download_to_file(self, remote_path, local_path):
- # Simulate an in-progress partial write that never completes
- # within the salvage grace period.
- local_path.write_bytes(b"partial...")
- time.sleep(2.0)
- return True # would complete eventually, but too late
- def disconnect(self):
- pass
- monkeypatch.setattr(bambu_ftp, "BambuFTPClient", FakeClient)
- monkeypatch.setattr(FakeClient, "_mode_cache", {}, raising=False)
- monkeypatch.setattr(FakeClient, "A1_MODELS", set(), raising=False)
- monkeypatch.setattr(FakeClient, "cache_mode", staticmethod(lambda ip, mode: None), raising=False)
- result = await download_file_async(
- "127.0.0.1",
- "12345678",
- "/cache/partial.bin",
- local,
- timeout=0.1,
- printer_model="X1C",
- )
- assert result is False
- @pytest.mark.asyncio
- async def test_download_file_async_timeout_waits_for_slow_zombie(self, tmp_path, monkeypatch):
- """A zombie that completes within the 30s grace window is salvaged.
- Regression for #1014: on slow WiFi, download_to_file can overshoot the
- user's ftp_timeout by 10–30 s without being stuck. The old fixed 0.5 s
- post-timeout sleep was too short — it gave up and started attempt 2
- while attempt 1's zombie thread kept running, and by the time the zombie
- wrote the file to disk with a success flag, attempt 2 had already
- reported failure (its own completion dict was still False). The async
- wrapper now waits up to min(timeout, 30 s) for the worker thread to
- finish before returning, so a slow-but-progressing download salvages.
- """
- from backend.app.services import bambu_ftp
- bambu_ftp.BambuFTPClient._mode_cache.pop("127.0.0.1", None)
- local = tmp_path / "slow_zombie.bin"
- expected_content = b"finished during grace window"
- class FakeClient:
- """Mimics a slow FTP: wait_for gives up at 1.0 s but RETR takes
- 1.5 s total. Old 0.5 s fixed sleep would have bailed (0.5 < 0.5
- extra); new grace = max(min(1.0, 30), 0.5) = 1.0 s covers the
- remaining 0.5 s so salvage succeeds."""
- def __init__(self, *args, **kwargs):
- pass
- def connect(self):
- return True
- def download_to_file(self, remote_path, local_path):
- time.sleep(1.5) # wait_for times out at 1.0 s; zombie finishes 0.5 s later
- local_path.write_bytes(expected_content)
- return True
- def disconnect(self):
- pass
- monkeypatch.setattr(bambu_ftp, "BambuFTPClient", FakeClient)
- monkeypatch.setattr(FakeClient, "_mode_cache", {}, raising=False)
- monkeypatch.setattr(FakeClient, "A1_MODELS", set(), raising=False)
- monkeypatch.setattr(FakeClient, "cache_mode", staticmethod(lambda ip, mode: None), raising=False)
- result = await download_file_async(
- "127.0.0.1",
- "12345678",
- "/cache/slow_zombie.bin",
- local,
- timeout=1.0,
- printer_model="X1C",
- )
- assert result is True
- assert local.read_bytes() == expected_content
- @pytest.mark.asyncio
- async def test_download_file_try_paths_first_succeeds(self, patch_ftp_port, tmp_path):
- """download_file_try_paths_async succeeds on first path."""
- server = patch_ftp_port
- server.add_file("cache/try1.bin", b"first path")
- local = tmp_path / "try.bin"
- result = await download_file_try_paths_async(
- "127.0.0.1",
- "12345678",
- ["/cache/try1.bin", "/cache/try2.bin"],
- local,
- printer_model="X1C",
- )
- assert result is True
- assert local.read_bytes() == b"first path"
- @pytest.mark.asyncio
- async def test_download_file_try_paths_fallback(self, patch_ftp_port, tmp_path):
- """download_file_try_paths_async falls back to second path."""
- server = patch_ftp_port
- server.add_file("cache/second.bin", b"second path")
- local = tmp_path / "fallback.bin"
- result = await download_file_try_paths_async(
- "127.0.0.1",
- "12345678",
- ["/cache/missing.bin", "/cache/second.bin"],
- local,
- printer_model="X1C",
- )
- assert result is True
- assert local.read_bytes() == b"second path"
- @pytest.mark.asyncio
- async def test_list_files_async_success(self, patch_ftp_port):
- """list_files_async returns file list."""
- server = patch_ftp_port
- server.add_file("cache/listed.bin", b"data")
- result = await list_files_async(
- "127.0.0.1",
- "12345678",
- "/cache",
- timeout=30.0,
- printer_model="X1C",
- )
- names = {f["name"] for f in result}
- assert "listed.bin" in names
- @pytest.mark.asyncio
- async def test_delete_file_async_success(self, patch_ftp_port):
- """delete_file_async deletes a file."""
- server = patch_ftp_port
- server.add_file("cache/to_async_del.bin", b"delete me")
- result = await delete_file_async(
- "127.0.0.1",
- "12345678",
- "/cache/to_async_del.bin",
- printer_model="X1C",
- )
- assert result is True
- assert not server.file_exists("cache/to_async_del.bin")
- # ---------------------------------------------------------------------------
- # TestFailureScenarios
- # ---------------------------------------------------------------------------
- class TestFailureScenarios:
- """Regression tests for known FTP failure modes."""
- def test_550_caught_by_broad_except(self, ftp_client_factory, ftp_server, tmp_path):
- """550 error_perm is caught by (OSError, ftplib.Error) handler.
- Regression: error_perm is a subclass of ftplib.Error, so the
- broad except clause in upload_file catches it correctly.
- """
- ftp_server.inject_failure("STOR", 550, "Permission denied.")
- local = tmp_path / "test.bin"
- local.write_bytes(b"data")
- client = ftp_client_factory()
- client.connect()
- result = client.upload_file(local, "/cache/test.bin")
- assert result is False
- client.disconnect()
- def test_zero_byte_download_detected(self, ftp_client_factory, ftp_server, tmp_path):
- """0-byte download is detected and file is cleaned up.
- Regression: Prior to fix, 0-byte downloads were reported as success.
- """
- ftp_server.add_file("cache/zero.bin", b"")
- local = tmp_path / "zero.bin"
- client = ftp_client_factory()
- client.connect()
- result = client.download_to_file("/cache/zero.bin", local)
- assert result is False
- assert not local.exists()
- client.disconnect()
- def test_connection_refused_handled(self):
- """Connection refused is handled gracefully."""
- client = BambuFTPClient("127.0.0.1", "12345678", timeout=2.0)
- client.FTP_PORT = 1 # Almost certainly not listening
- assert client.connect() is False
- def test_auth_failure_530(self, ftp_client_factory, ftp_server):
- """530 authentication failure returns False."""
- ftp_server.inject_failure("PASS", 530, "Login incorrect.")
- client = ftp_client_factory()
- result = client.connect()
- assert result is False
- def test_retr_550_handled(self, ftp_client_factory, ftp_server):
- """RETR 550 (file not found) returns None."""
- ftp_server.inject_failure("RETR", 550, "File not found.")
- ftp_server.add_file("cache/exists.bin", b"data")
- client = ftp_client_factory()
- client.connect()
- result = client.download_file("/cache/exists.bin")
- assert result is None
- client.disconnect()
- def test_cwd_550_handled(self, ftp_client_factory, ftp_server):
- """CWD 550 is handled in list_files."""
- ftp_server.inject_failure("CWD", 550, "Directory not found.")
- client = ftp_client_factory()
- client.connect()
- result = client.list_files("/nonexistent")
- assert result == []
- client.disconnect()
- def test_stor_553_handled(self, ftp_client_factory, ftp_server, tmp_path):
- """STOR 553 (no SD card) handled gracefully."""
- ftp_server.inject_failure("STOR", 553, "Could not create file.")
- local = tmp_path / "test.bin"
- local.write_bytes(b"test")
- client = ftp_client_factory()
- client.connect()
- result = client.upload_file(local, "/cache/test.bin")
- assert result is False
- client.disconnect()
- def test_diagnose_storage_cwd_failure_doesnt_propagate(self, ftp_client_factory, ftp_server):
- """diagnose_storage CWD failure doesn't crash the whole operation.
- Regression: diagnose_storage() was called in the upload path and
- a CWD failure would propagate and crash the upload.
- """
- ftp_server.inject_failure("CWD", 550, "No such directory.", count=2)
- client = ftp_client_factory()
- client.connect()
- diag = client.diagnose_storage()
- # Should still return results (with errors noted)
- assert diag["connected"] is True
- assert len(diag["errors"]) > 0
- client.disconnect()
- def test_failure_injection_count_decrements(self, ftp_client_factory, ftp_server):
- """Failure injection with count decrements and eventually succeeds."""
- ftp_server.add_file("cache/retry.bin", b"data after retry")
- ftp_server.inject_failure("RETR", 550, "Temporary error.", count=1)
- client = ftp_client_factory()
- client.connect()
- # First attempt fails
- result1 = client.download_file("/cache/retry.bin")
- assert result1 is None
- # Second attempt succeeds (failure count exhausted)
- result2 = client.download_file("/cache/retry.bin")
- assert result2 == b"data after retry"
- client.disconnect()
- def test_upload_skips_voidresp(self, ftp_client_factory, ftp_server, tmp_path):
- """Upload returns True without calling voidresp() for any model.
- voidresp() is skipped for all models: A1 printers hang on it,
- H2D printers delay the 226 response by 30+ seconds, and X1C/P1S
- gain nothing from waiting. The file is on the SD card once
- sendall() returns.
- """
- content = b"voidresp test data"
- local = tmp_path / "voidresp_test.3mf"
- local.write_bytes(content)
- for model in ("X1C", "A1", "H2D", None):
- client = ftp_client_factory(printer_model=model)
- client.connect()
- result = client.upload_file(local, "/cache/voidresp_test.3mf")
- assert result is True, f"Upload failed for model={model}"
- client.disconnect()
- # Verify the file is actually on the server
- time.sleep(_UPLOAD_FLUSH_DELAY)
- client2 = ftp_client_factory()
- client2.connect()
- downloaded = client2.download_file("/cache/voidresp_test.3mf")
- assert downloaded == content, f"Content mismatch for model={model}"
- client2.disconnect()
- # ---------------------------------------------------------------------------
- # Short-circuit retries on 550 (#972)
- # ---------------------------------------------------------------------------
- class TestFileNotOnPrinterShortCircuit:
- """FileNotOnPrinterError must bypass the retry budget.
- Before this fix, a 3MF path that wasn't on the printer (550) cost
- `ftp_retry_count + 1` attempts × `ftp_retry_delay` seconds per candidate
- path. With ftp_retry_count=10 and four candidate paths, that's ~22 min
- of dead retries before the real path is tried. #972 in the wild showed
- 48 min of retrying paths that didn't exist.
- """
- async def test_with_ftp_retry_propagates_file_not_on_printer_without_retrying(self):
- """with_ftp_retry raises FileNotOnPrinterError on first attempt.
- Verifies non_retry_exceptions short-circuits before the retry loop
- has a chance to sleep and try again.
- """
- attempts = {"n": 0}
- async def always_missing(*_args, **_kwargs):
- attempts["n"] += 1
- raise FileNotOnPrinterError("/cache/absent.3mf: 550")
- with pytest.raises(FileNotOnPrinterError):
- await with_ftp_retry(
- always_missing,
- max_retries=10,
- retry_delay=0.01,
- operation_name="test 550 short-circuit",
- non_retry_exceptions=(FileNotOnPrinterError,),
- )
- assert attempts["n"] == 1, "550 must not trigger any retry"
- async def test_with_ftp_retry_still_retries_transient_errors(self):
- """Non-550 exceptions continue to retry up to max_retries + 1."""
- attempts = {"n": 0}
- async def flaky(*_args, **_kwargs):
- attempts["n"] += 1
- raise TimeoutError("transient")
- result = await with_ftp_retry(
- flaky,
- max_retries=2,
- retry_delay=0.01,
- operation_name="test transient retries",
- non_retry_exceptions=(FileNotOnPrinterError,),
- )
- assert result is None
- assert attempts["n"] == 3, "Transient errors should retry to exhaustion"
- def test_download_to_file_raises_on_missing_path(self, ftp_client_factory, tmp_path):
- """download_to_file surfaces 550 as FileNotOnPrinterError end-to-end
- against the real mock FTPS server, not just a hand-rolled mock."""
- local = tmp_path / "never_downloaded.3mf"
- client = ftp_client_factory()
- client.connect()
- try:
- with pytest.raises(FileNotOnPrinterError):
- client.download_to_file("/cache/does_not_exist.3mf", local)
- finally:
- client.disconnect()
- assert not local.exists(), "Partial file must be cleaned up on 550"
- # ---------------------------------------------------------------------------
- # 3MF download cache (#972)
- # ---------------------------------------------------------------------------
- class TestThreeMFCache:
- """Cover endpoint and archive flow share downloaded 3MF bytes via this
- cache. Tests isolate themselves with clear_3mf_cache(delete_files=False)
- so they don't clobber each other."""
- def setup_method(self):
- clear_3mf_cache(delete_files=False)
- def teardown_method(self):
- clear_3mf_cache(delete_files=False)
- def test_normalize_collapses_filename_variants(self):
- """Bambu names vary (.3mf, .gcode.3mf, with spaces) — they all map
- to the same cache slot so both flows agree on the key."""
- canonical = normalize_3mf_name("Broly_Legendary.gcode.3mf")
- assert normalize_3mf_name("Broly_Legendary.3mf") == canonical
- assert normalize_3mf_name("Broly_Legendary") == canonical
- # Bambu Studio rewrites spaces to underscores on upload — treat as equal
- assert normalize_3mf_name("Broly Legendary") == canonical
- # Case is also collapsed so keys match across capitalizations
- assert normalize_3mf_name("BROLY_LEGENDARY.3MF") == canonical
- def test_cache_hit_returns_stored_path(self, tmp_path):
- """get_cached_3mf returns the same Path that was put in."""
- f = tmp_path / "Broly.gcode.3mf"
- f.write_bytes(b"fake 3mf content")
- cache_3mf_download(1, "Broly.gcode.3mf", f)
- assert get_cached_3mf(1, "Broly.gcode.3mf") == f
- def test_cache_lookup_uses_normalized_name(self, tmp_path):
- """Caching under .gcode.3mf and querying with bare name still hits."""
- f = tmp_path / "Broly.gcode.3mf"
- f.write_bytes(b"x")
- cache_3mf_download(1, "Broly.gcode.3mf", f)
- assert get_cached_3mf(1, "Broly.3mf") == f
- assert get_cached_3mf(1, "Broly") == f
- def test_cache_miss_on_different_printer(self, tmp_path):
- """Printer id is part of the key — two printers never collide."""
- f = tmp_path / "A.3mf"
- f.write_bytes(b"x")
- cache_3mf_download(1, "A.3mf", f)
- assert get_cached_3mf(2, "A.3mf") is None
- def test_cache_evicts_when_file_deleted(self, tmp_path):
- """Stale entry (file gone) returns None and is dropped from the dict."""
- f = tmp_path / "A.3mf"
- f.write_bytes(b"x")
- cache_3mf_download(1, "A.3mf", f)
- f.unlink()
- assert get_cached_3mf(1, "A.3mf") is None
- # Re-populating after eviction works — no ghost entries remain.
- f.write_bytes(b"y")
- cache_3mf_download(1, "A.3mf", f)
- assert get_cached_3mf(1, "A.3mf") == f
- def test_clear_by_printer_scoped(self, tmp_path, monkeypatch):
- """Clearing one printer leaves the other untouched."""
- from backend.app.core import config as _config
- monkeypatch.setattr(_config.settings, "archive_dir", tmp_path)
- temp_dir = tmp_path / "temp"
- temp_dir.mkdir()
- f1 = temp_dir / "one.3mf"
- f1.write_bytes(b"1")
- f2 = temp_dir / "two.3mf"
- f2.write_bytes(b"2")
- cache_3mf_download(1, "one.3mf", f1)
- cache_3mf_download(2, "two.3mf", f2)
- clear_3mf_cache(1)
- assert get_cached_3mf(1, "one.3mf") is None
- assert get_cached_3mf(2, "two.3mf") == f2
- # clear_3mf_cache defaulted to delete_files=True, so the temp file is gone
- assert not f1.exists()
- assert f2.exists()
- def test_clear_without_deleting_files(self, tmp_path, monkeypatch):
- """delete_files=False leaves files on disk — used by tests."""
- from backend.app.core import config as _config
- monkeypatch.setattr(_config.settings, "archive_dir", tmp_path)
- temp_dir = tmp_path / "temp"
- temp_dir.mkdir()
- f = temp_dir / "keep.3mf"
- f.write_bytes(b"x")
- cache_3mf_download(1, "keep.3mf", f)
- clear_3mf_cache(1, delete_files=False)
- assert get_cached_3mf(1, "keep.3mf") is None
- assert f.exists()
- def test_clear_does_not_delete_persistent_files(self, tmp_path, monkeypatch):
- """Regression for #1212 / "file disappeared overnight" reports.
- Dispatch sites added in #1166 cache the live archive copy and library
- file bytes — paths outside ``archive_dir/temp`` — so /cover can skip
- FTP. Those files are user data; the cache cleanup must never unlink
- them. Pre-fix, ``clear_3mf_cache(printer_id, delete_files=True)`` ran
- on every ``on_print_complete`` and silently destroyed them, leaving a
- DB row whose ``file_path`` pointed at nothing — breaking Reprint and
- View G-code with a 404.
- """
- from backend.app.core import config as _config
- monkeypatch.setattr(_config.settings, "archive_dir", tmp_path / "archive")
- (tmp_path / "archive" / "temp").mkdir(parents=True)
- archive_file = tmp_path / "archive" / "1" / "20260504_wallhooks" / "wallhooks.gcode.3mf"
- archive_file.parent.mkdir(parents=True)
- archive_file.write_bytes(b"archive bytes")
- library_file = tmp_path / "library_files" / "abcd.3mf"
- library_file.parent.mkdir(parents=True)
- library_file.write_bytes(b"library bytes")
- temp_file = tmp_path / "archive" / "temp" / "cover_1_x.3mf"
- temp_file.write_bytes(b"temp bytes")
- cache_3mf_download(1, "wallhooks.gcode.3mf", archive_file)
- cache_3mf_download(1, "library.3mf", library_file)
- cache_3mf_download(1, "cover_1_x.3mf", temp_file)
- clear_3mf_cache(1)
- # All three cache entries are dropped from the dict.
- assert get_cached_3mf(1, "wallhooks.gcode.3mf") is None
- assert get_cached_3mf(1, "library.3mf") is None
- assert get_cached_3mf(1, "cover_1_x.3mf") is None
- # But only the temp file is unlinked — user data survives.
- assert archive_file.exists(), "archive 3mf must not be deleted by cache cleanup"
- assert library_file.exists(), "library 3mf must not be deleted by cache cleanup"
- assert not temp_file.exists(), "temp file should still be cleaned up"
|