| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224 |
- """Integration tests for Library API endpoints."""
- import io
- import tempfile
- import zipfile
- from pathlib import Path
- import pytest
- from httpx import AsyncClient
- class TestLibraryFoldersAPI:
- """Integration tests for library folders endpoints."""
- @pytest.fixture
- async def folder_factory(self, db_session):
- """Factory to create test folders."""
- _counter = [0]
- async def _create_folder(**kwargs):
- from backend.app.models.library import LibraryFolder
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "name": f"Test Folder {counter}",
- }
- defaults.update(kwargs)
- folder = LibraryFolder(**defaults)
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- return folder
- return _create_folder
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_folders_empty(self, async_client: AsyncClient, db_session):
- """Verify empty folder list returns empty array."""
- response = await async_client.get("/api/v1/library/folders")
- assert response.status_code == 200
- assert response.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_folder(self, async_client: AsyncClient, db_session):
- """Verify folder can be created."""
- data = {"name": "New Folder"}
- response = await async_client.post("/api/v1/library/folders", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "New Folder"
- assert result["id"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_nested_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify nested folder can be created."""
- parent = await folder_factory(name="Parent")
- data = {"name": "Child", "parent_id": parent.id}
- response = await async_client.post("/api/v1/library/folders", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "Child"
- assert result["parent_id"] == parent.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify single folder can be retrieved."""
- folder = await folder_factory(name="Test Folder")
- response = await async_client.get(f"/api/v1/library/folders/{folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["id"] == folder.id
- assert result["name"] == "Test Folder"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_folder_not_found(self, async_client: AsyncClient, db_session):
- """Verify 404 for non-existent folder."""
- response = await async_client.get("/api/v1/library/folders/9999")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify folder can be updated."""
- folder = await folder_factory(name="Old Name")
- data = {"name": "New Name"}
- response = await async_client.put(f"/api/v1/library/folders/{folder.id}", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "New Name"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify folder can be deleted."""
- folder = await folder_factory()
- response = await async_client.delete(f"/api/v1/library/folders/{folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert result.get("message") or result.get("success", True)
- class TestLibraryFilesAPI:
- """Integration tests for library files endpoints."""
- @pytest.fixture
- async def folder_factory(self, db_session):
- """Factory to create test folders."""
- _counter = [0]
- async def _create_folder(**kwargs):
- from backend.app.models.library import LibraryFolder
- _counter[0] += 1
- counter = _counter[0]
- defaults = {"name": f"Test Folder {counter}"}
- defaults.update(kwargs)
- folder = LibraryFolder(**defaults)
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- return folder
- return _create_folder
- @pytest.fixture
- async def file_factory(self, db_session):
- """Factory to create test files."""
- _counter = [0]
- async def _create_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_file_{counter}.3mf",
- "file_path": f"/test/path/test_file_{counter}.3mf",
- "file_size": 1024,
- "file_type": "3mf",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_empty(self, async_client: AsyncClient, db_session):
- """Verify empty file list returns empty array."""
- response = await async_client.get("/api/v1/library/files")
- assert response.status_code == 200
- assert response.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_in_folder(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """Verify files can be filtered by folder."""
- folder = await folder_factory()
- file1 = await file_factory(folder_id=folder.id)
- await file_factory() # File in root (no folder)
- response = await async_client.get(f"/api/v1/library/files?folder_id={folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert len(result) == 1
- assert result[0]["id"] == file1.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_by_project_id(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """#932: project_id filter returns files across all folders linked to the project.
- Replaces the prior N+1 pattern where the frontend fired one request per
- linked folder. A single JOIN query must return every file in folders whose
- project_id matches, while excluding files from unlinked folders.
- """
- from backend.app.models.project import Project
- project = Project(name="Test Project for Files", color="#00ff00")
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- folder_a = await folder_factory(name="Folder A", project_id=project.id)
- folder_b = await folder_factory(name="Folder B", project_id=project.id)
- other_folder = await folder_factory(name="Unlinked")
- linked_a = await file_factory(folder_id=folder_a.id, filename="a.3mf")
- linked_b = await file_factory(folder_id=folder_b.id, filename="b.3mf")
- await file_factory(folder_id=other_folder.id, filename="unlinked.3mf")
- await file_factory(filename="root.3mf") # no folder → not part of any project
- response = await async_client.get(f"/api/v1/library/files?project_id={project.id}")
- assert response.status_code == 200
- result = response.json()
- ids = {f["id"] for f in result}
- assert ids == {linked_a.id, linked_b.id}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_folder_id_takes_precedence_over_project_id(
- self, async_client: AsyncClient, folder_factory, file_factory, db_session
- ):
- """When both folder_id and project_id are passed, folder_id wins.
- Documented precedence in list_files(): folder_id > project_id > include_root.
- This guards the behavior so a future refactor can't silently flip it.
- """
- from backend.app.models.project import Project
- project = Project(name="Precedence Project")
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- folder_linked = await folder_factory(name="Linked", project_id=project.id)
- folder_other = await folder_factory(name="Other")
- await file_factory(folder_id=folder_linked.id, filename="linked.3mf")
- other_file = await file_factory(folder_id=folder_other.id, filename="other.3mf")
- # folder_id points at a folder that is NOT in the project — must return
- # that folder's contents and ignore project_id entirely.
- response = await async_client.get(f"/api/v1/library/files?folder_id={folder_other.id}&project_id={project.id}")
- assert response.status_code == 200
- result = response.json()
- assert len(result) == 1
- assert result[0]["id"] == other_file.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify single file can be retrieved."""
- lib_file = await file_factory(filename="test.3mf")
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["id"] == lib_file.id
- assert result["filename"] == "test.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_file_not_found(self, async_client: AsyncClient, db_session):
- """Verify 404 for non-existent file."""
- response = await async_client.get("/api/v1/library/files/9999")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file can be deleted."""
- lib_file = await file_factory()
- response = await async_client.delete(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result.get("message") or result.get("success", True)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file can be renamed."""
- lib_file = await file_factory(filename="old_name.3mf")
- data = {"filename": "new_name.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["filename"] == "new_name.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file_invalid_path_separator(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file rename fails with path separators."""
- lib_file = await file_factory(filename="test.3mf")
- data = {"filename": "path/to/file.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 400
- assert "path separator" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file_invalid_backslash(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file rename fails with backslash."""
- lib_file = await file_factory(filename="test.3mf")
- data = {"filename": "path\\to\\file.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 400
- assert "path separator" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_stats(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """Verify library stats endpoint returns counts."""
- await folder_factory()
- await folder_factory()
- await file_factory()
- response = await async_client.get("/api/v1/library/stats")
- assert response.status_code == 200
- result = response.json()
- assert result["total_folders"] == 2
- assert result["total_files"] == 1
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_list_includes_user_tracking_fields(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file list response includes user tracking fields (Issue #206)."""
- lib_file = await file_factory(filename="test.3mf")
- response = await async_client.get("/api/v1/library/files?include_root=false")
- assert response.status_code == 200
- result = response.json()
- assert len(result) >= 1
- # Find our test file
- test_file = next((f for f in result if f["id"] == lib_file.id), None)
- assert test_file is not None
- # User tracking fields should be present (even if null)
- assert "created_by_id" in test_file
- assert "created_by_username" in test_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_detail_includes_user_tracking_fields(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file detail response includes user tracking fields (Issue #206)."""
- lib_file = await file_factory(filename="test_detail.3mf")
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- # User tracking fields should be present (even if null)
- assert "created_by_id" in result
- assert "created_by_username" in result
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_with_user_tracking(self, async_client: AsyncClient, db_session):
- """Verify file created with user shows username in response (Issue #206)."""
- from backend.app.models.library import LibraryFile
- from backend.app.models.user import User
- # Create a test user
- user = User(username="testuploader", password_hash="fakehash", role="user")
- db_session.add(user)
- await db_session.flush()
- # Create a file with created_by_id set
- lib_file = LibraryFile(
- filename="user_uploaded.3mf",
- file_path="/test/user_uploaded.3mf",
- file_size=2048,
- file_type="3mf",
- created_by_id=user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- # Verify file detail shows username
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["created_by_id"] == user.id
- assert result["created_by_username"] == "testuploader"
- # Verify file list also shows username
- response = await async_client.get("/api/v1/library/files?include_root=false")
- assert response.status_code == 200
- files = response.json()
- test_file = next((f for f in files if f["id"] == lib_file.id), None)
- assert test_file is not None
- assert test_file["created_by_id"] == user.id
- assert test_file["created_by_username"] == "testuploader"
- class TestLibraryAddToQueueAPI:
- """Integration tests for /api/v1/library/files/add-to-queue endpoint."""
- @pytest.fixture
- async def printer_factory(self, db_session):
- """Factory to create test printers."""
- _counter = [0]
- async def _create_printer(**kwargs):
- from backend.app.models.printer import Printer
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "name": f"Test Printer {counter}",
- "ip_address": f"192.168.1.{100 + counter}",
- "serial_number": f"TESTSERIAL{counter:04d}",
- "access_code": "12345678",
- "model": "X1C",
- }
- defaults.update(kwargs)
- printer = Printer(**defaults)
- db_session.add(printer)
- await db_session.commit()
- await db_session.refresh(printer)
- return printer
- return _create_printer
- @pytest.fixture
- async def library_file_factory(self, db_session):
- """Factory to create test library files."""
- _counter = [0]
- async def _create_library_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_file_{counter}.gcode.3mf",
- "file_path": f"/test/path/test_file_{counter}.gcode.3mf",
- "file_size": 1024,
- "file_type": "3mf",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_library_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_add_to_queue_file_not_found(self, async_client: AsyncClient, printer_factory, db_session):
- """Verify error for non-existent file."""
- await printer_factory()
- data = {"file_ids": [9999]}
- response = await async_client.post("/api/v1/library/files/add-to-queue", json=data)
- assert response.status_code == 200
- result = response.json()
- assert len(result["added"]) == 0
- assert len(result["errors"]) == 1
- assert result["errors"][0]["file_id"] == 9999
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_add_non_sliced_file_to_queue_fails(
- self, async_client: AsyncClient, printer_factory, library_file_factory, db_session
- ):
- """Verify non-sliced file cannot be added to queue."""
- await printer_factory()
- lib_file = await library_file_factory(
- filename="model.stl",
- file_path="/test/path/model.stl",
- file_type="stl",
- )
- data = {"file_ids": [lib_file.id]}
- response = await async_client.post("/api/v1/library/files/add-to-queue", json=data)
- assert response.status_code == 200
- result = response.json()
- assert len(result["added"]) == 0
- assert len(result["errors"]) == 1
- assert "sliced" in result["errors"][0]["error"].lower()
- class TestLibraryZipExtractAPI:
- """Integration tests for ZIP extraction endpoint."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_invalid_file_type(self, async_client: AsyncClient, db_session):
- """Verify non-ZIP files are rejected."""
- # Create a fake file that's not a ZIP
- files = {"file": ("test.txt", b"This is not a zip file", "text/plain")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 400
- assert "ZIP" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_basic(self, async_client: AsyncClient, db_session):
- """Verify basic ZIP extraction works."""
- import io
- # Create a simple ZIP file in memory
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("test1.txt", "Content of file 1")
- zf.writestr("test2.txt", "Content of file 2")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert len(result["files"]) == 2
- assert len(result["errors"]) == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_with_folders(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction preserves folder structure."""
- import io
- # Create a ZIP file with folder structure
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("folder1/file1.txt", "Content 1")
- zf.writestr("folder1/subfolder/file2.txt", "Content 2")
- zf.writestr("folder2/file3.txt", "Content 3")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"preserve_structure": "true"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 3
- assert result["folders_created"] >= 3 # folder1, folder1/subfolder, folder2
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_flat(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction can extract flat (no folders)."""
- import io
- # Create a ZIP file with folder structure
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("folder/file1.txt", "Content 1")
- zf.writestr("folder/file2.txt", "Content 2")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"preserve_structure": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert result["folders_created"] == 0 # No folders created when flat
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_skips_macos_files(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction skips __MACOSX and hidden files."""
- import io
- # Create a ZIP file with macOS junk files
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("real_file.txt", "Real content")
- zf.writestr("__MACOSX/._real_file.txt", "macOS metadata")
- zf.writestr(".hidden_file", "Hidden content")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 1 # Only real_file.txt
- assert result["files"][0]["filename"] == "real_file.txt"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_create_folder_from_zip(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction creates a folder from the ZIP filename."""
- import io
- # Create a ZIP file with some files
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("file1.txt", "Content 1")
- zf.writestr("file2.txt", "Content 2")
- zip_buffer.seek(0)
- files = {"file": ("MyProject.zip", zip_buffer.read(), "application/zip")}
- params = {"create_folder_from_zip": "true", "preserve_structure": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert result["folders_created"] == 1 # MyProject folder created
- # Verify the files are in a folder
- assert result["files"][0]["folder_id"] is not None
- assert result["files"][1]["folder_id"] is not None
- # Both files should be in the same folder
- assert result["files"][0]["folder_id"] == result["files"][1]["folder_id"]
- # Verify the folder was created with the right name
- folder_response = await async_client.get(f"/api/v1/library/folders/{result['files'][0]['folder_id']}")
- assert folder_response.status_code == 200
- folder = folder_response.json()
- assert folder["name"] == "MyProject"
- class TestLibraryStlThumbnailAPI:
- """Integration tests for STL thumbnail generation endpoints."""
- @pytest.fixture
- async def file_factory(self, db_session):
- """Factory to create test files."""
- _counter = [0]
- async def _create_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_model_{counter}.stl",
- "file_path": f"/test/path/test_model_{counter}.stl",
- "file_size": 1024,
- "file_type": "stl",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_empty(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with no files."""
- data = {"all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 0
- assert result["succeeded"] == 0
- assert result["failed"] == 0
- assert result["results"] == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_no_criteria(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with no criteria returns empty."""
- data = {}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_file_not_on_disk(
- self, async_client: AsyncClient, file_factory, db_session
- ):
- """Verify batch thumbnail generation handles missing files gracefully."""
- # Create a file in DB but not on disk
- stl_file = await file_factory(
- filename="missing.stl",
- file_path="/nonexistent/path/missing.stl",
- thumbnail_path=None,
- )
- data = {"file_ids": [stl_file.id]}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 1
- assert result["succeeded"] == 0
- assert result["failed"] == 1
- assert result["results"][0]["success"] is False
- assert "not found" in result["results"][0]["error"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_with_real_stl(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with a real STL file."""
- from backend.app.models.library import LibraryFile
- # Create a simple ASCII STL cube
- stl_content = """solid cube
- facet normal 0 0 -1
- outer loop
- vertex 0 0 0
- vertex 1 0 0
- vertex 1 1 0
- endloop
- endfacet
- facet normal 0 0 1
- outer loop
- vertex 0 0 1
- vertex 1 1 1
- vertex 1 0 1
- endloop
- endfacet
- endsolid cube"""
- with tempfile.NamedTemporaryFile(suffix=".stl", delete=False, mode="w") as f:
- f.write(stl_content)
- stl_path = f.name
- try:
- # Create file in DB pointing to real STL
- lib_file = LibraryFile(
- filename="test_cube.stl",
- file_path=stl_path,
- file_size=len(stl_content),
- file_type="stl",
- thumbnail_path=None,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- data = {"file_ids": [lib_file.id]}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 1
- # Result depends on whether trimesh/matplotlib are installed
- # Either succeeds or fails gracefully
- assert result["succeeded"] + result["failed"] == 1
- finally:
- import os
- if os.path.exists(stl_path):
- os.unlink(stl_path)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_upload_file_with_stl_thumbnail_param(self, async_client: AsyncClient, db_session):
- """Verify file upload accepts generate_stl_thumbnails parameter."""
- # Create a simple STL file
- stl_content = b"solid test\nendsolid test"
- files = {"file": ("test.stl", stl_content, "application/octet-stream")}
- params = {"generate_stl_thumbnails": "false"}
- response = await async_client.post("/api/v1/library/files", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["filename"] == "test.stl"
- assert result["file_type"] == "stl"
- # No thumbnail should be generated when disabled
- assert result["thumbnail_path"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_with_stl_thumbnail_param(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction accepts generate_stl_thumbnails parameter."""
- # Create a ZIP file containing an STL
- stl_content = b"solid test\nendsolid test"
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("model.stl", stl_content)
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"generate_stl_thumbnails": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 1
- assert result["files"][0]["filename"] == "model.stl"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_by_folder(self, async_client: AsyncClient, file_factory, db_session):
- """Verify batch thumbnail generation can filter by folder."""
- from backend.app.models.library import LibraryFolder
- # Create a folder
- folder = LibraryFolder(name="STL Folder")
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- # Create STL file in folder (no thumbnail)
- stl_in_folder = await file_factory(
- filename="in_folder.stl",
- folder_id=folder.id,
- thumbnail_path=None,
- )
- # Create STL file at root (no thumbnail)
- _stl_at_root = await file_factory(
- filename="at_root.stl",
- folder_id=None,
- thumbnail_path=None,
- )
- # Request thumbnails only for files in folder
- data = {"folder_id": folder.id, "all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- # Should only process the file in the folder
- assert result["processed"] == 1
- assert result["results"][0]["file_id"] == stl_in_folder.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_all_missing(self, async_client: AsyncClient, file_factory, db_session):
- """Verify batch thumbnail generation finds all STL files missing thumbnails."""
- # Create files with and without thumbnails
- _stl_with_thumb = await file_factory(
- filename="with_thumb.stl",
- thumbnail_path="/some/path/thumb.png",
- )
- stl_without_thumb1 = await file_factory(
- filename="without_thumb1.stl",
- thumbnail_path=None,
- )
- stl_without_thumb2 = await file_factory(
- filename="without_thumb2.stl",
- thumbnail_path=None,
- )
- data = {"all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- # Should only process files without thumbnails
- assert result["processed"] == 2
- file_ids = {r["file_id"] for r in result["results"]}
- assert stl_without_thumb1.id in file_ids
- assert stl_without_thumb2.id in file_ids
- class TestLibraryPathHelpers:
- """Tests for path handling utilities used for backup portability."""
- def test_to_relative_path_converts_absolute(self):
- """Verify absolute paths are converted to relative paths."""
- from backend.app.api.routes.library import to_relative_path
- from backend.app.core.config import settings
- base_dir = str(settings.base_dir)
- abs_path = f"{base_dir}/archive/library/files/test.3mf"
- rel_path = to_relative_path(abs_path)
- assert not rel_path.startswith("/")
- assert rel_path == "archive/library/files/test.3mf"
- def test_to_relative_path_handles_path_object(self):
- """Verify Path objects are handled correctly."""
- from pathlib import Path
- from backend.app.api.routes.library import to_relative_path
- from backend.app.core.config import settings
- abs_path = Path(settings.base_dir) / "archive" / "test.3mf"
- rel_path = to_relative_path(abs_path)
- assert not rel_path.startswith("/")
- assert rel_path == "archive/test.3mf"
- def test_to_relative_path_returns_empty_for_empty_input(self):
- """Verify empty input returns empty string."""
- from backend.app.api.routes.library import to_relative_path
- assert to_relative_path("") == ""
- assert to_relative_path(None) == ""
- def test_to_absolute_path_converts_relative(self):
- """Verify relative paths are converted to absolute paths."""
- from backend.app.api.routes.library import to_absolute_path
- from backend.app.core.config import settings
- rel_path = "archive/library/files/test.3mf"
- abs_path = to_absolute_path(rel_path)
- assert abs_path is not None
- assert abs_path.is_absolute()
- assert str(abs_path) == f"{settings.base_dir}/archive/library/files/test.3mf"
- def test_to_absolute_path_handles_already_absolute(self):
- """Verify already absolute paths are returned as-is (for backwards compatibility)."""
- from backend.app.api.routes.library import to_absolute_path
- abs_path_str = "/data/archive/test.3mf"
- result = to_absolute_path(abs_path_str)
- assert result is not None
- assert str(result) == abs_path_str
- def test_to_absolute_path_returns_none_for_empty(self):
- """Verify None/empty input returns None."""
- from backend.app.api.routes.library import to_absolute_path
- assert to_absolute_path(None) is None
- assert to_absolute_path("") is None
- class TestLibraryPermissions:
- """Tests for library permission enforcement."""
- @pytest.fixture
- async def auth_setup(self, db_session):
- """Set up auth with users of different permission levels."""
- from backend.app.core.auth import create_access_token, get_password_hash
- from backend.app.models.group import Group
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- # Enable auth
- settings = Settings(key="auth_enabled", value="true")
- db_session.add(settings)
- await db_session.commit()
- # Groups are auto-seeded during db init, but we need to commit them
- await db_session.commit()
- # Get groups
- from sqlalchemy import select
- admin_group = (await db_session.execute(select(Group).where(Group.name == "Administrators"))).scalar_one()
- operator_group = (await db_session.execute(select(Group).where(Group.name == "Operators"))).scalar_one()
- viewer_group = (await db_session.execute(select(Group).where(Group.name == "Viewers"))).scalar_one()
- password_hash = get_password_hash("password")
- # Create users
- admin_user = User(username="admin_lib", password_hash=password_hash, role="admin", is_active=True)
- admin_user.groups.append(admin_group)
- operator_user = User(username="operator_lib", password_hash=password_hash, is_active=True)
- operator_user.groups.append(operator_group)
- viewer_user = User(username="viewer_lib", password_hash=password_hash, is_active=True)
- viewer_user.groups.append(viewer_group)
- db_session.add_all([admin_user, operator_user, viewer_user])
- await db_session.commit()
- # Create tokens
- admin_token = create_access_token(data={"sub": admin_user.username})
- operator_token = create_access_token(data={"sub": operator_user.username})
- viewer_token = create_access_token(data={"sub": viewer_user.username})
- return {
- "admin_user": admin_user,
- "operator_user": operator_user,
- "viewer_user": viewer_user,
- "admin_token": admin_token,
- "operator_token": operator_token,
- "viewer_token": viewer_token,
- }
- @pytest.fixture
- async def test_file(self, db_session, auth_setup):
- """Create a test file owned by the operator user."""
- from backend.app.models.library import LibraryFile
- operator_user = auth_setup["operator_user"]
- lib_file = LibraryFile(
- filename="test.txt",
- file_path="data/archive/library/files/test.txt",
- file_type="txt",
- file_size=100,
- created_by_id=operator_user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_requires_library_read(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify list_files requires library:read permission."""
- viewer_token = auth_setup["viewer_token"]
- # Viewers have library:read, should succeed
- response = await async_client.get("/api/v1/library/files", headers={"Authorization": f"Bearer {viewer_token}"})
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_denied_without_permission(self, async_client: AsyncClient, db_session):
- """Verify list_files denied without auth when auth is enabled."""
- from backend.app.models.settings import Settings
- # Enable auth
- settings = Settings(key="auth_enabled", value="true")
- db_session.add(settings)
- await db_session.commit()
- # Request without token should fail
- response = await async_client.get("/api/v1/library/files")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_own_by_owner(self, async_client: AsyncClient, db_session, auth_setup, test_file):
- """Verify operator can delete their own files."""
- from pathlib import Path
- # Create actual file on disk so delete doesn't fail
- from backend.app.core.config import settings as app_settings
- file_path = Path(app_settings.base_dir) / test_file.file_path
- file_path.parent.mkdir(parents=True, exist_ok=True)
- file_path.write_text("test content")
- operator_token = auth_setup["operator_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{test_file.id}", headers={"Authorization": f"Bearer {operator_token}"}
- )
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_own_denied_for_others_file(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify operator cannot delete files owned by others."""
- # Create another operator user with a file
- from sqlalchemy import select
- from backend.app.core.auth import create_access_token
- from backend.app.models.group import Group
- from backend.app.models.library import LibraryFile
- from backend.app.models.user import User
- operator_group = (await db_session.execute(select(Group).where(Group.name == "Operators"))).scalar_one()
- from backend.app.core.auth import get_password_hash as get_pw_hash
- other_user = User(username="other_op", password_hash=get_pw_hash("password"), is_active=True)
- other_user.groups.append(operator_group)
- db_session.add(other_user)
- await db_session.commit()
- await db_session.refresh(other_user)
- # Create file owned by other user
- other_file = LibraryFile(
- filename="other.txt",
- file_path="data/archive/library/files/other.txt",
- file_type="txt",
- file_size=100,
- created_by_id=other_user.id,
- )
- db_session.add(other_file)
- await db_session.commit()
- await db_session.refresh(other_file)
- # Original operator should not be able to delete it
- operator_token = auth_setup["operator_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{other_file.id}", headers={"Authorization": f"Bearer {operator_token}"}
- )
- assert response.status_code == 403
- assert "your own files" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_admin_can_delete_any(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify admin can delete any file."""
- from pathlib import Path
- from backend.app.core.config import settings as app_settings
- from backend.app.models.library import LibraryFile
- # Create file owned by operator
- operator_user = auth_setup["operator_user"]
- lib_file = LibraryFile(
- filename="admin_can_delete.txt",
- file_path="data/archive/library/files/admin_can_delete.txt",
- file_type="txt",
- file_size=100,
- created_by_id=operator_user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- # Create actual file on disk
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- file_path.parent.mkdir(parents=True, exist_ok=True)
- file_path.write_text("test content")
- # Admin should be able to delete it
- admin_token = auth_setup["admin_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{lib_file.id}", headers={"Authorization": f"Bearer {admin_token}"}
- )
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_viewer_cannot_delete_files(self, async_client: AsyncClient, db_session, auth_setup, test_file):
- """Verify viewer cannot delete any files."""
- viewer_token = auth_setup["viewer_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{test_file.id}", headers={"Authorization": f"Bearer {viewer_token}"}
- )
- # Viewers don't have delete_own or delete_all permissions
- assert response.status_code == 403
- class TestPrintFileUploadValidation:
- """#1401: pre-flight rejection of unprintable uploads at the library +
- archive routes. Smoke tests the shared ``validate_print_file_upload``
- helper through both surfaces a user can reach with a drag-drop."""
- def _valid_3mf_bytes(self, name: str = "Metadata/plate_1.gcode") -> bytes:
- """Build a minimal-but-real zip with the gcode-3mf magic in it so
- the validator's ``startswith(b"PK\\x03\\x04")`` check passes."""
- buf = io.BytesIO()
- with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr(name, "; G-code\nG28\n")
- return buf.getvalue()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_rejects_raw_gcode_upload(self, async_client: AsyncClient, db_session):
- """``Foo.gcode`` direct uploads are blocked at the library route —
- the dispatcher would otherwise append ``.3mf`` and ship raw gcode
- to the printer as a fake 3MF."""
- files = {"file": ("plate_1.gcode", b"; raw gcode\nG28\n", "application/octet-stream")}
- response = await async_client.post("/api/v1/library/files", files=files)
- assert response.status_code == 400
- # Error message must name the actual remedy, not just say "invalid".
- assert "gcode.3mf" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_rejects_non_zip_3mf_upload(self, async_client: AsyncClient, db_session):
- """A ``.3mf`` upload whose body isn't a zip is rejected — covers
- raw gcode renamed to .3mf, corrupted downloads, etc."""
- files = {"file": ("model.3mf", b"; raw gcode\nG28\n", "application/octet-stream")}
- response = await async_client.post("/api/v1/library/files", files=files)
- assert response.status_code == 400
- assert "ZIP container" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_rejects_non_zip_gcode_3mf_upload(self, async_client: AsyncClient, db_session):
- """The compound-extension ``.gcode.3mf`` case is gated by the same
- zip-magic check — splitext returns just ``.3mf``, but the suffix
- match covers both."""
- files = {"file": ("plate_1.gcode.3mf", b"; raw gcode\nG28\n", "application/octet-stream")}
- response = await async_client.post("/api/v1/library/files", files=files)
- assert response.status_code == 400
- assert "ZIP container" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_accepts_valid_gcode_3mf_upload(self, async_client: AsyncClient, db_session):
- """A real ``.gcode.3mf`` zip uploads successfully — the existing
- happy path is not regressed by the new validation."""
- files = {
- "file": (
- "plate_1.gcode.3mf",
- self._valid_3mf_bytes(),
- "application/zip",
- )
- }
- response = await async_client.post("/api/v1/library/files", files=files)
- assert response.status_code == 200
- result = response.json()
- assert result["filename"] == "plate_1.gcode.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_still_accepts_non_print_extensions(self, async_client: AsyncClient, db_session):
- """STL / image / other non-print uploads bypass the validator
- entirely — Bambuddy is also a library, not just a print dispatcher."""
- files = {"file": ("model.stl", b"solid test\nendsolid test", "application/octet-stream")}
- response = await async_client.post(
- "/api/v1/library/files", files=files, params={"generate_stl_thumbnails": "false"}
- )
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_upload_rejects_non_zip(self, async_client: AsyncClient, db_session):
- """``POST /archives/upload`` shares the same validator — covers the
- manual archive-upload entry point too."""
- files = {"file": ("model.3mf", b"; raw gcode\nG28\n", "application/octet-stream")}
- response = await async_client.post("/api/v1/archives/upload", files=files)
- assert response.status_code == 400
- assert "ZIP container" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_bulk_upload_collects_per_file_errors(self, async_client: AsyncClient, db_session):
- """The bulk-archive route reports validation failures per file and
- continues processing the remaining items — one bad upload in a
- 10-file drag-drop must not abort the whole batch."""
- good = self._valid_3mf_bytes()
- bad = b"; raw gcode\nG28\n"
- # httpx multipart with a list-of-tuples preserves order + same field name.
- files = [
- ("files", ("good.3mf", good, "application/zip")),
- ("files", ("bad.3mf", bad, "application/octet-stream")),
- ]
- response = await async_client.post("/api/v1/archives/upload-bulk", files=files)
- assert response.status_code == 200
- body = response.json()
- # The bulk route's archive_print may still reject the "good" file
- # downstream (no printer match, etc.) — we don't care about that
- # here; what matters is the bad file lands in `errors` with the
- # validator's message and the route didn't 500.
- assert body["failed"] >= 1
- bad_errors = [e for e in body["errors"] if e["filename"] == "bad.3mf"]
- assert bad_errors, body
- assert "ZIP container" in bad_errors[0]["error"]
|