| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114 |
- """Integration tests for Library API endpoints."""
- import io
- import tempfile
- import zipfile
- from pathlib import Path
- import pytest
- from httpx import AsyncClient
- class TestLibraryFoldersAPI:
- """Integration tests for library folders endpoints."""
- @pytest.fixture
- async def folder_factory(self, db_session):
- """Factory to create test folders."""
- _counter = [0]
- async def _create_folder(**kwargs):
- from backend.app.models.library import LibraryFolder
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "name": f"Test Folder {counter}",
- }
- defaults.update(kwargs)
- folder = LibraryFolder(**defaults)
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- return folder
- return _create_folder
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_folders_empty(self, async_client: AsyncClient, db_session):
- """Verify empty folder list returns empty array."""
- response = await async_client.get("/api/v1/library/folders")
- assert response.status_code == 200
- assert response.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_folder(self, async_client: AsyncClient, db_session):
- """Verify folder can be created."""
- data = {"name": "New Folder"}
- response = await async_client.post("/api/v1/library/folders", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "New Folder"
- assert result["id"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_nested_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify nested folder can be created."""
- parent = await folder_factory(name="Parent")
- data = {"name": "Child", "parent_id": parent.id}
- response = await async_client.post("/api/v1/library/folders", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "Child"
- assert result["parent_id"] == parent.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify single folder can be retrieved."""
- folder = await folder_factory(name="Test Folder")
- response = await async_client.get(f"/api/v1/library/folders/{folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["id"] == folder.id
- assert result["name"] == "Test Folder"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_folder_not_found(self, async_client: AsyncClient, db_session):
- """Verify 404 for non-existent folder."""
- response = await async_client.get("/api/v1/library/folders/9999")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify folder can be updated."""
- folder = await folder_factory(name="Old Name")
- data = {"name": "New Name"}
- response = await async_client.put(f"/api/v1/library/folders/{folder.id}", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "New Name"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify folder can be deleted."""
- folder = await folder_factory()
- response = await async_client.delete(f"/api/v1/library/folders/{folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert result.get("message") or result.get("success", True)
- class TestLibraryFilesAPI:
- """Integration tests for library files endpoints."""
- @pytest.fixture
- async def folder_factory(self, db_session):
- """Factory to create test folders."""
- _counter = [0]
- async def _create_folder(**kwargs):
- from backend.app.models.library import LibraryFolder
- _counter[0] += 1
- counter = _counter[0]
- defaults = {"name": f"Test Folder {counter}"}
- defaults.update(kwargs)
- folder = LibraryFolder(**defaults)
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- return folder
- return _create_folder
- @pytest.fixture
- async def file_factory(self, db_session):
- """Factory to create test files."""
- _counter = [0]
- async def _create_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_file_{counter}.3mf",
- "file_path": f"/test/path/test_file_{counter}.3mf",
- "file_size": 1024,
- "file_type": "3mf",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_empty(self, async_client: AsyncClient, db_session):
- """Verify empty file list returns empty array."""
- response = await async_client.get("/api/v1/library/files")
- assert response.status_code == 200
- assert response.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_in_folder(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """Verify files can be filtered by folder."""
- folder = await folder_factory()
- file1 = await file_factory(folder_id=folder.id)
- await file_factory() # File in root (no folder)
- response = await async_client.get(f"/api/v1/library/files?folder_id={folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert len(result) == 1
- assert result[0]["id"] == file1.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_by_project_id(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """#932: project_id filter returns files across all folders linked to the project.
- Replaces the prior N+1 pattern where the frontend fired one request per
- linked folder. A single JOIN query must return every file in folders whose
- project_id matches, while excluding files from unlinked folders.
- """
- from backend.app.models.project import Project
- project = Project(name="Test Project for Files", color="#00ff00")
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- folder_a = await folder_factory(name="Folder A", project_id=project.id)
- folder_b = await folder_factory(name="Folder B", project_id=project.id)
- other_folder = await folder_factory(name="Unlinked")
- linked_a = await file_factory(folder_id=folder_a.id, filename="a.3mf")
- linked_b = await file_factory(folder_id=folder_b.id, filename="b.3mf")
- await file_factory(folder_id=other_folder.id, filename="unlinked.3mf")
- await file_factory(filename="root.3mf") # no folder → not part of any project
- response = await async_client.get(f"/api/v1/library/files?project_id={project.id}")
- assert response.status_code == 200
- result = response.json()
- ids = {f["id"] for f in result}
- assert ids == {linked_a.id, linked_b.id}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_folder_id_takes_precedence_over_project_id(
- self, async_client: AsyncClient, folder_factory, file_factory, db_session
- ):
- """When both folder_id and project_id are passed, folder_id wins.
- Documented precedence in list_files(): folder_id > project_id > include_root.
- This guards the behavior so a future refactor can't silently flip it.
- """
- from backend.app.models.project import Project
- project = Project(name="Precedence Project")
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- folder_linked = await folder_factory(name="Linked", project_id=project.id)
- folder_other = await folder_factory(name="Other")
- await file_factory(folder_id=folder_linked.id, filename="linked.3mf")
- other_file = await file_factory(folder_id=folder_other.id, filename="other.3mf")
- # folder_id points at a folder that is NOT in the project — must return
- # that folder's contents and ignore project_id entirely.
- response = await async_client.get(f"/api/v1/library/files?folder_id={folder_other.id}&project_id={project.id}")
- assert response.status_code == 200
- result = response.json()
- assert len(result) == 1
- assert result[0]["id"] == other_file.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify single file can be retrieved."""
- lib_file = await file_factory(filename="test.3mf")
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["id"] == lib_file.id
- assert result["filename"] == "test.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_file_not_found(self, async_client: AsyncClient, db_session):
- """Verify 404 for non-existent file."""
- response = await async_client.get("/api/v1/library/files/9999")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file can be deleted."""
- lib_file = await file_factory()
- response = await async_client.delete(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result.get("message") or result.get("success", True)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file can be renamed."""
- lib_file = await file_factory(filename="old_name.3mf")
- data = {"filename": "new_name.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["filename"] == "new_name.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file_invalid_path_separator(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file rename fails with path separators."""
- lib_file = await file_factory(filename="test.3mf")
- data = {"filename": "path/to/file.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 400
- assert "path separator" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file_invalid_backslash(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file rename fails with backslash."""
- lib_file = await file_factory(filename="test.3mf")
- data = {"filename": "path\\to\\file.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 400
- assert "path separator" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_stats(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """Verify library stats endpoint returns counts."""
- await folder_factory()
- await folder_factory()
- await file_factory()
- response = await async_client.get("/api/v1/library/stats")
- assert response.status_code == 200
- result = response.json()
- assert result["total_folders"] == 2
- assert result["total_files"] == 1
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_list_includes_user_tracking_fields(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file list response includes user tracking fields (Issue #206)."""
- lib_file = await file_factory(filename="test.3mf")
- response = await async_client.get("/api/v1/library/files?include_root=false")
- assert response.status_code == 200
- result = response.json()
- assert len(result) >= 1
- # Find our test file
- test_file = next((f for f in result if f["id"] == lib_file.id), None)
- assert test_file is not None
- # User tracking fields should be present (even if null)
- assert "created_by_id" in test_file
- assert "created_by_username" in test_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_detail_includes_user_tracking_fields(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file detail response includes user tracking fields (Issue #206)."""
- lib_file = await file_factory(filename="test_detail.3mf")
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- # User tracking fields should be present (even if null)
- assert "created_by_id" in result
- assert "created_by_username" in result
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_with_user_tracking(self, async_client: AsyncClient, db_session):
- """Verify file created with user shows username in response (Issue #206)."""
- from backend.app.models.library import LibraryFile
- from backend.app.models.user import User
- # Create a test user
- user = User(username="testuploader", password_hash="fakehash", role="user")
- db_session.add(user)
- await db_session.flush()
- # Create a file with created_by_id set
- lib_file = LibraryFile(
- filename="user_uploaded.3mf",
- file_path="/test/user_uploaded.3mf",
- file_size=2048,
- file_type="3mf",
- created_by_id=user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- # Verify file detail shows username
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["created_by_id"] == user.id
- assert result["created_by_username"] == "testuploader"
- # Verify file list also shows username
- response = await async_client.get("/api/v1/library/files?include_root=false")
- assert response.status_code == 200
- files = response.json()
- test_file = next((f for f in files if f["id"] == lib_file.id), None)
- assert test_file is not None
- assert test_file["created_by_id"] == user.id
- assert test_file["created_by_username"] == "testuploader"
- class TestLibraryAddToQueueAPI:
- """Integration tests for /api/v1/library/files/add-to-queue endpoint."""
- @pytest.fixture
- async def printer_factory(self, db_session):
- """Factory to create test printers."""
- _counter = [0]
- async def _create_printer(**kwargs):
- from backend.app.models.printer import Printer
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "name": f"Test Printer {counter}",
- "ip_address": f"192.168.1.{100 + counter}",
- "serial_number": f"TESTSERIAL{counter:04d}",
- "access_code": "12345678",
- "model": "X1C",
- }
- defaults.update(kwargs)
- printer = Printer(**defaults)
- db_session.add(printer)
- await db_session.commit()
- await db_session.refresh(printer)
- return printer
- return _create_printer
- @pytest.fixture
- async def library_file_factory(self, db_session):
- """Factory to create test library files."""
- _counter = [0]
- async def _create_library_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_file_{counter}.gcode.3mf",
- "file_path": f"/test/path/test_file_{counter}.gcode.3mf",
- "file_size": 1024,
- "file_type": "3mf",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_library_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_add_to_queue_file_not_found(self, async_client: AsyncClient, printer_factory, db_session):
- """Verify error for non-existent file."""
- await printer_factory()
- data = {"file_ids": [9999]}
- response = await async_client.post("/api/v1/library/files/add-to-queue", json=data)
- assert response.status_code == 200
- result = response.json()
- assert len(result["added"]) == 0
- assert len(result["errors"]) == 1
- assert result["errors"][0]["file_id"] == 9999
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_add_non_sliced_file_to_queue_fails(
- self, async_client: AsyncClient, printer_factory, library_file_factory, db_session
- ):
- """Verify non-sliced file cannot be added to queue."""
- await printer_factory()
- lib_file = await library_file_factory(
- filename="model.stl",
- file_path="/test/path/model.stl",
- file_type="stl",
- )
- data = {"file_ids": [lib_file.id]}
- response = await async_client.post("/api/v1/library/files/add-to-queue", json=data)
- assert response.status_code == 200
- result = response.json()
- assert len(result["added"]) == 0
- assert len(result["errors"]) == 1
- assert "sliced" in result["errors"][0]["error"].lower()
- class TestLibraryZipExtractAPI:
- """Integration tests for ZIP extraction endpoint."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_invalid_file_type(self, async_client: AsyncClient, db_session):
- """Verify non-ZIP files are rejected."""
- # Create a fake file that's not a ZIP
- files = {"file": ("test.txt", b"This is not a zip file", "text/plain")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 400
- assert "ZIP" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_basic(self, async_client: AsyncClient, db_session):
- """Verify basic ZIP extraction works."""
- import io
- # Create a simple ZIP file in memory
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("test1.txt", "Content of file 1")
- zf.writestr("test2.txt", "Content of file 2")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert len(result["files"]) == 2
- assert len(result["errors"]) == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_with_folders(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction preserves folder structure."""
- import io
- # Create a ZIP file with folder structure
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("folder1/file1.txt", "Content 1")
- zf.writestr("folder1/subfolder/file2.txt", "Content 2")
- zf.writestr("folder2/file3.txt", "Content 3")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"preserve_structure": "true"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 3
- assert result["folders_created"] >= 3 # folder1, folder1/subfolder, folder2
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_flat(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction can extract flat (no folders)."""
- import io
- # Create a ZIP file with folder structure
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("folder/file1.txt", "Content 1")
- zf.writestr("folder/file2.txt", "Content 2")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"preserve_structure": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert result["folders_created"] == 0 # No folders created when flat
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_skips_macos_files(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction skips __MACOSX and hidden files."""
- import io
- # Create a ZIP file with macOS junk files
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("real_file.txt", "Real content")
- zf.writestr("__MACOSX/._real_file.txt", "macOS metadata")
- zf.writestr(".hidden_file", "Hidden content")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 1 # Only real_file.txt
- assert result["files"][0]["filename"] == "real_file.txt"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_create_folder_from_zip(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction creates a folder from the ZIP filename."""
- import io
- # Create a ZIP file with some files
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("file1.txt", "Content 1")
- zf.writestr("file2.txt", "Content 2")
- zip_buffer.seek(0)
- files = {"file": ("MyProject.zip", zip_buffer.read(), "application/zip")}
- params = {"create_folder_from_zip": "true", "preserve_structure": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert result["folders_created"] == 1 # MyProject folder created
- # Verify the files are in a folder
- assert result["files"][0]["folder_id"] is not None
- assert result["files"][1]["folder_id"] is not None
- # Both files should be in the same folder
- assert result["files"][0]["folder_id"] == result["files"][1]["folder_id"]
- # Verify the folder was created with the right name
- folder_response = await async_client.get(f"/api/v1/library/folders/{result['files'][0]['folder_id']}")
- assert folder_response.status_code == 200
- folder = folder_response.json()
- assert folder["name"] == "MyProject"
- class TestLibraryStlThumbnailAPI:
- """Integration tests for STL thumbnail generation endpoints."""
- @pytest.fixture
- async def file_factory(self, db_session):
- """Factory to create test files."""
- _counter = [0]
- async def _create_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_model_{counter}.stl",
- "file_path": f"/test/path/test_model_{counter}.stl",
- "file_size": 1024,
- "file_type": "stl",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_empty(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with no files."""
- data = {"all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 0
- assert result["succeeded"] == 0
- assert result["failed"] == 0
- assert result["results"] == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_no_criteria(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with no criteria returns empty."""
- data = {}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_file_not_on_disk(
- self, async_client: AsyncClient, file_factory, db_session
- ):
- """Verify batch thumbnail generation handles missing files gracefully."""
- # Create a file in DB but not on disk
- stl_file = await file_factory(
- filename="missing.stl",
- file_path="/nonexistent/path/missing.stl",
- thumbnail_path=None,
- )
- data = {"file_ids": [stl_file.id]}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 1
- assert result["succeeded"] == 0
- assert result["failed"] == 1
- assert result["results"][0]["success"] is False
- assert "not found" in result["results"][0]["error"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_with_real_stl(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with a real STL file."""
- from backend.app.models.library import LibraryFile
- # Create a simple ASCII STL cube
- stl_content = """solid cube
- facet normal 0 0 -1
- outer loop
- vertex 0 0 0
- vertex 1 0 0
- vertex 1 1 0
- endloop
- endfacet
- facet normal 0 0 1
- outer loop
- vertex 0 0 1
- vertex 1 1 1
- vertex 1 0 1
- endloop
- endfacet
- endsolid cube"""
- with tempfile.NamedTemporaryFile(suffix=".stl", delete=False, mode="w") as f:
- f.write(stl_content)
- stl_path = f.name
- try:
- # Create file in DB pointing to real STL
- lib_file = LibraryFile(
- filename="test_cube.stl",
- file_path=stl_path,
- file_size=len(stl_content),
- file_type="stl",
- thumbnail_path=None,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- data = {"file_ids": [lib_file.id]}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 1
- # Result depends on whether trimesh/matplotlib are installed
- # Either succeeds or fails gracefully
- assert result["succeeded"] + result["failed"] == 1
- finally:
- import os
- if os.path.exists(stl_path):
- os.unlink(stl_path)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_upload_file_with_stl_thumbnail_param(self, async_client: AsyncClient, db_session):
- """Verify file upload accepts generate_stl_thumbnails parameter."""
- # Create a simple STL file
- stl_content = b"solid test\nendsolid test"
- files = {"file": ("test.stl", stl_content, "application/octet-stream")}
- params = {"generate_stl_thumbnails": "false"}
- response = await async_client.post("/api/v1/library/files", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["filename"] == "test.stl"
- assert result["file_type"] == "stl"
- # No thumbnail should be generated when disabled
- assert result["thumbnail_path"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_with_stl_thumbnail_param(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction accepts generate_stl_thumbnails parameter."""
- # Create a ZIP file containing an STL
- stl_content = b"solid test\nendsolid test"
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("model.stl", stl_content)
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"generate_stl_thumbnails": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 1
- assert result["files"][0]["filename"] == "model.stl"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_by_folder(self, async_client: AsyncClient, file_factory, db_session):
- """Verify batch thumbnail generation can filter by folder."""
- from backend.app.models.library import LibraryFolder
- # Create a folder
- folder = LibraryFolder(name="STL Folder")
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- # Create STL file in folder (no thumbnail)
- stl_in_folder = await file_factory(
- filename="in_folder.stl",
- folder_id=folder.id,
- thumbnail_path=None,
- )
- # Create STL file at root (no thumbnail)
- _stl_at_root = await file_factory(
- filename="at_root.stl",
- folder_id=None,
- thumbnail_path=None,
- )
- # Request thumbnails only for files in folder
- data = {"folder_id": folder.id, "all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- # Should only process the file in the folder
- assert result["processed"] == 1
- assert result["results"][0]["file_id"] == stl_in_folder.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_all_missing(self, async_client: AsyncClient, file_factory, db_session):
- """Verify batch thumbnail generation finds all STL files missing thumbnails."""
- # Create files with and without thumbnails
- _stl_with_thumb = await file_factory(
- filename="with_thumb.stl",
- thumbnail_path="/some/path/thumb.png",
- )
- stl_without_thumb1 = await file_factory(
- filename="without_thumb1.stl",
- thumbnail_path=None,
- )
- stl_without_thumb2 = await file_factory(
- filename="without_thumb2.stl",
- thumbnail_path=None,
- )
- data = {"all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- # Should only process files without thumbnails
- assert result["processed"] == 2
- file_ids = {r["file_id"] for r in result["results"]}
- assert stl_without_thumb1.id in file_ids
- assert stl_without_thumb2.id in file_ids
- class TestLibraryPathHelpers:
- """Tests for path handling utilities used for backup portability."""
- def test_to_relative_path_converts_absolute(self):
- """Verify absolute paths are converted to relative paths."""
- from backend.app.api.routes.library import to_relative_path
- from backend.app.core.config import settings
- base_dir = str(settings.base_dir)
- abs_path = f"{base_dir}/archive/library/files/test.3mf"
- rel_path = to_relative_path(abs_path)
- assert not rel_path.startswith("/")
- assert rel_path == "archive/library/files/test.3mf"
- def test_to_relative_path_handles_path_object(self):
- """Verify Path objects are handled correctly."""
- from pathlib import Path
- from backend.app.api.routes.library import to_relative_path
- from backend.app.core.config import settings
- abs_path = Path(settings.base_dir) / "archive" / "test.3mf"
- rel_path = to_relative_path(abs_path)
- assert not rel_path.startswith("/")
- assert rel_path == "archive/test.3mf"
- def test_to_relative_path_returns_empty_for_empty_input(self):
- """Verify empty input returns empty string."""
- from backend.app.api.routes.library import to_relative_path
- assert to_relative_path("") == ""
- assert to_relative_path(None) == ""
- def test_to_absolute_path_converts_relative(self):
- """Verify relative paths are converted to absolute paths."""
- from backend.app.api.routes.library import to_absolute_path
- from backend.app.core.config import settings
- rel_path = "archive/library/files/test.3mf"
- abs_path = to_absolute_path(rel_path)
- assert abs_path is not None
- assert abs_path.is_absolute()
- assert str(abs_path) == f"{settings.base_dir}/archive/library/files/test.3mf"
- def test_to_absolute_path_handles_already_absolute(self):
- """Verify already absolute paths are returned as-is (for backwards compatibility)."""
- from backend.app.api.routes.library import to_absolute_path
- abs_path_str = "/data/archive/test.3mf"
- result = to_absolute_path(abs_path_str)
- assert result is not None
- assert str(result) == abs_path_str
- def test_to_absolute_path_returns_none_for_empty(self):
- """Verify None/empty input returns None."""
- from backend.app.api.routes.library import to_absolute_path
- assert to_absolute_path(None) is None
- assert to_absolute_path("") is None
- class TestLibraryPermissions:
- """Tests for library permission enforcement."""
- @pytest.fixture
- async def auth_setup(self, db_session):
- """Set up auth with users of different permission levels."""
- from backend.app.core.auth import create_access_token, get_password_hash
- from backend.app.models.group import Group
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- # Enable auth
- settings = Settings(key="auth_enabled", value="true")
- db_session.add(settings)
- await db_session.commit()
- # Groups are auto-seeded during db init, but we need to commit them
- await db_session.commit()
- # Get groups
- from sqlalchemy import select
- admin_group = (await db_session.execute(select(Group).where(Group.name == "Administrators"))).scalar_one()
- operator_group = (await db_session.execute(select(Group).where(Group.name == "Operators"))).scalar_one()
- viewer_group = (await db_session.execute(select(Group).where(Group.name == "Viewers"))).scalar_one()
- password_hash = get_password_hash("password")
- # Create users
- admin_user = User(username="admin_lib", password_hash=password_hash, role="admin", is_active=True)
- admin_user.groups.append(admin_group)
- operator_user = User(username="operator_lib", password_hash=password_hash, is_active=True)
- operator_user.groups.append(operator_group)
- viewer_user = User(username="viewer_lib", password_hash=password_hash, is_active=True)
- viewer_user.groups.append(viewer_group)
- db_session.add_all([admin_user, operator_user, viewer_user])
- await db_session.commit()
- # Create tokens
- admin_token = create_access_token(data={"sub": admin_user.username})
- operator_token = create_access_token(data={"sub": operator_user.username})
- viewer_token = create_access_token(data={"sub": viewer_user.username})
- return {
- "admin_user": admin_user,
- "operator_user": operator_user,
- "viewer_user": viewer_user,
- "admin_token": admin_token,
- "operator_token": operator_token,
- "viewer_token": viewer_token,
- }
- @pytest.fixture
- async def test_file(self, db_session, auth_setup):
- """Create a test file owned by the operator user."""
- from backend.app.models.library import LibraryFile
- operator_user = auth_setup["operator_user"]
- lib_file = LibraryFile(
- filename="test.txt",
- file_path="data/archive/library/files/test.txt",
- file_type="txt",
- file_size=100,
- created_by_id=operator_user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_requires_library_read(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify list_files requires library:read permission."""
- viewer_token = auth_setup["viewer_token"]
- # Viewers have library:read, should succeed
- response = await async_client.get("/api/v1/library/files", headers={"Authorization": f"Bearer {viewer_token}"})
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_denied_without_permission(self, async_client: AsyncClient, db_session):
- """Verify list_files denied without auth when auth is enabled."""
- from backend.app.models.settings import Settings
- # Enable auth
- settings = Settings(key="auth_enabled", value="true")
- db_session.add(settings)
- await db_session.commit()
- # Request without token should fail
- response = await async_client.get("/api/v1/library/files")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_own_by_owner(self, async_client: AsyncClient, db_session, auth_setup, test_file):
- """Verify operator can delete their own files."""
- from pathlib import Path
- # Create actual file on disk so delete doesn't fail
- from backend.app.core.config import settings as app_settings
- file_path = Path(app_settings.base_dir) / test_file.file_path
- file_path.parent.mkdir(parents=True, exist_ok=True)
- file_path.write_text("test content")
- operator_token = auth_setup["operator_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{test_file.id}", headers={"Authorization": f"Bearer {operator_token}"}
- )
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_own_denied_for_others_file(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify operator cannot delete files owned by others."""
- # Create another operator user with a file
- from sqlalchemy import select
- from backend.app.core.auth import create_access_token
- from backend.app.models.group import Group
- from backend.app.models.library import LibraryFile
- from backend.app.models.user import User
- operator_group = (await db_session.execute(select(Group).where(Group.name == "Operators"))).scalar_one()
- from backend.app.core.auth import get_password_hash as get_pw_hash
- other_user = User(username="other_op", password_hash=get_pw_hash("password"), is_active=True)
- other_user.groups.append(operator_group)
- db_session.add(other_user)
- await db_session.commit()
- await db_session.refresh(other_user)
- # Create file owned by other user
- other_file = LibraryFile(
- filename="other.txt",
- file_path="data/archive/library/files/other.txt",
- file_type="txt",
- file_size=100,
- created_by_id=other_user.id,
- )
- db_session.add(other_file)
- await db_session.commit()
- await db_session.refresh(other_file)
- # Original operator should not be able to delete it
- operator_token = auth_setup["operator_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{other_file.id}", headers={"Authorization": f"Bearer {operator_token}"}
- )
- assert response.status_code == 403
- assert "your own files" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_admin_can_delete_any(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify admin can delete any file."""
- from pathlib import Path
- from backend.app.core.config import settings as app_settings
- from backend.app.models.library import LibraryFile
- # Create file owned by operator
- operator_user = auth_setup["operator_user"]
- lib_file = LibraryFile(
- filename="admin_can_delete.txt",
- file_path="data/archive/library/files/admin_can_delete.txt",
- file_type="txt",
- file_size=100,
- created_by_id=operator_user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- # Create actual file on disk
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- file_path.parent.mkdir(parents=True, exist_ok=True)
- file_path.write_text("test content")
- # Admin should be able to delete it
- admin_token = auth_setup["admin_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{lib_file.id}", headers={"Authorization": f"Bearer {admin_token}"}
- )
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_viewer_cannot_delete_files(self, async_client: AsyncClient, db_session, auth_setup, test_file):
- """Verify viewer cannot delete any files."""
- viewer_token = auth_setup["viewer_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{test_file.id}", headers={"Authorization": f"Bearer {viewer_token}"}
- )
- # Viewers don't have delete_own or delete_all permissions
- assert response.status_code == 403
|