| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226 |
- """Integration tests for Library API endpoints."""
- import io
- import tempfile
- import zipfile
- from pathlib import Path
- import pytest
- from httpx import AsyncClient
- class TestLibraryFoldersAPI:
- """Integration tests for library folders endpoints."""
- @pytest.fixture
- async def folder_factory(self, db_session):
- """Factory to create test folders."""
- _counter = [0]
- async def _create_folder(**kwargs):
- from backend.app.models.library import LibraryFolder
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "name": f"Test Folder {counter}",
- }
- defaults.update(kwargs)
- folder = LibraryFolder(**defaults)
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- return folder
- return _create_folder
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_folders_empty(self, async_client: AsyncClient, db_session):
- """Verify empty folder list returns empty array."""
- response = await async_client.get("/api/v1/library/folders")
- assert response.status_code == 200
- assert response.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_folder(self, async_client: AsyncClient, db_session):
- """Verify folder can be created."""
- data = {"name": "New Folder"}
- response = await async_client.post("/api/v1/library/folders", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "New Folder"
- assert result["id"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_nested_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify nested folder can be created."""
- parent = await folder_factory(name="Parent")
- data = {"name": "Child", "parent_id": parent.id}
- response = await async_client.post("/api/v1/library/folders", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "Child"
- assert result["parent_id"] == parent.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify single folder can be retrieved."""
- folder = await folder_factory(name="Test Folder")
- response = await async_client.get(f"/api/v1/library/folders/{folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["id"] == folder.id
- assert result["name"] == "Test Folder"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_folder_not_found(self, async_client: AsyncClient, db_session):
- """Verify 404 for non-existent folder."""
- response = await async_client.get("/api/v1/library/folders/9999")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify folder can be updated."""
- folder = await folder_factory(name="Old Name")
- data = {"name": "New Name"}
- response = await async_client.put(f"/api/v1/library/folders/{folder.id}", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "New Name"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_folder(self, async_client: AsyncClient, folder_factory, db_session):
- """Verify folder can be deleted."""
- folder = await folder_factory()
- response = await async_client.delete(f"/api/v1/library/folders/{folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert result.get("message") or result.get("success", True)
- class TestLibraryFilesAPI:
- """Integration tests for library files endpoints."""
- @pytest.fixture
- async def folder_factory(self, db_session):
- """Factory to create test folders."""
- _counter = [0]
- async def _create_folder(**kwargs):
- from backend.app.models.library import LibraryFolder
- _counter[0] += 1
- counter = _counter[0]
- defaults = {"name": f"Test Folder {counter}"}
- defaults.update(kwargs)
- folder = LibraryFolder(**defaults)
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- return folder
- return _create_folder
- @pytest.fixture
- async def file_factory(self, db_session):
- """Factory to create test files."""
- _counter = [0]
- async def _create_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_file_{counter}.3mf",
- "file_path": f"/test/path/test_file_{counter}.3mf",
- "file_size": 1024,
- "file_type": "3mf",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_empty(self, async_client: AsyncClient, db_session):
- """Verify empty file list returns empty array."""
- response = await async_client.get("/api/v1/library/files")
- assert response.status_code == 200
- assert response.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_in_folder(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """Verify files can be filtered by folder."""
- folder = await folder_factory()
- file1 = await file_factory(folder_id=folder.id)
- await file_factory() # File in root (no folder)
- response = await async_client.get(f"/api/v1/library/files?folder_id={folder.id}")
- assert response.status_code == 200
- result = response.json()
- assert len(result) == 1
- assert result[0]["id"] == file1.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_by_project_id(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """#932: project_id filter returns files across all folders linked to the project.
- Replaces the prior N+1 pattern where the frontend fired one request per
- linked folder. A single JOIN query must return every file in folders whose
- project_id matches, while excluding files from unlinked folders.
- """
- from backend.app.models.project import Project
- project = Project(name="Test Project for Files", color="#00ff00")
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- folder_a = await folder_factory(name="Folder A", project_id=project.id)
- folder_b = await folder_factory(name="Folder B", project_id=project.id)
- other_folder = await folder_factory(name="Unlinked")
- linked_a = await file_factory(folder_id=folder_a.id, filename="a.3mf")
- linked_b = await file_factory(folder_id=folder_b.id, filename="b.3mf")
- await file_factory(folder_id=other_folder.id, filename="unlinked.3mf")
- await file_factory(filename="root.3mf") # no folder → not part of any project
- response = await async_client.get(f"/api/v1/library/files?project_id={project.id}")
- assert response.status_code == 200
- result = response.json()
- ids = {f["id"] for f in result}
- assert ids == {linked_a.id, linked_b.id}
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_folder_id_takes_precedence_over_project_id(
- self, async_client: AsyncClient, folder_factory, file_factory, db_session
- ):
- """When both folder_id and project_id are passed, folder_id wins.
- Documented precedence in list_files(): folder_id > project_id > include_root.
- This guards the behavior so a future refactor can't silently flip it.
- """
- from backend.app.models.project import Project
- project = Project(name="Precedence Project")
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- folder_linked = await folder_factory(name="Linked", project_id=project.id)
- folder_other = await folder_factory(name="Other")
- await file_factory(folder_id=folder_linked.id, filename="linked.3mf")
- other_file = await file_factory(folder_id=folder_other.id, filename="other.3mf")
- # folder_id points at a folder that is NOT in the project — must return
- # that folder's contents and ignore project_id entirely.
- response = await async_client.get(f"/api/v1/library/files?folder_id={folder_other.id}&project_id={project.id}")
- assert response.status_code == 200
- result = response.json()
- assert len(result) == 1
- assert result[0]["id"] == other_file.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify single file can be retrieved."""
- lib_file = await file_factory(filename="test.3mf")
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["id"] == lib_file.id
- assert result["filename"] == "test.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_file_not_found(self, async_client: AsyncClient, db_session):
- """Verify 404 for non-existent file."""
- response = await async_client.get("/api/v1/library/files/9999")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file can be deleted."""
- lib_file = await file_factory()
- response = await async_client.delete(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result.get("message") or result.get("success", True)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file can be renamed."""
- lib_file = await file_factory(filename="old_name.3mf")
- data = {"filename": "new_name.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["filename"] == "new_name.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file_invalid_path_separator(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file rename fails with a forward slash (FAT32-illegal, #1540)."""
- lib_file = await file_factory(filename="test.3mf")
- data = {"filename": "path/to/file.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 400
- assert "invalid character" in response.json()["detail"].lower()
- assert "/" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_file_invalid_backslash(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file rename fails with a backslash (FAT32-illegal, #1540)."""
- lib_file = await file_factory(filename="test.3mf")
- data = {"filename": "path\\to\\file.3mf"}
- response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
- assert response.status_code == 400
- assert "invalid character" in response.json()["detail"].lower()
- assert "\\" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_stats(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
- """Verify library stats endpoint returns counts."""
- await folder_factory()
- await folder_factory()
- await file_factory()
- response = await async_client.get("/api/v1/library/stats")
- assert response.status_code == 200
- result = response.json()
- assert result["total_folders"] == 2
- assert result["total_files"] == 1
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_list_includes_user_tracking_fields(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file list response includes user tracking fields (Issue #206)."""
- lib_file = await file_factory(filename="test.3mf")
- response = await async_client.get("/api/v1/library/files?include_root=false")
- assert response.status_code == 200
- result = response.json()
- assert len(result) >= 1
- # Find our test file
- test_file = next((f for f in result if f["id"] == lib_file.id), None)
- assert test_file is not None
- # User tracking fields should be present (even if null)
- assert "created_by_id" in test_file
- assert "created_by_username" in test_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_detail_includes_user_tracking_fields(self, async_client: AsyncClient, file_factory, db_session):
- """Verify file detail response includes user tracking fields (Issue #206)."""
- lib_file = await file_factory(filename="test_detail.3mf")
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- # User tracking fields should be present (even if null)
- assert "created_by_id" in result
- assert "created_by_username" in result
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_file_with_user_tracking(self, async_client: AsyncClient, db_session):
- """Verify file created with user shows username in response (Issue #206)."""
- from backend.app.models.library import LibraryFile
- from backend.app.models.user import User
- # Create a test user
- user = User(username="testuploader", password_hash="fakehash", role="user")
- db_session.add(user)
- await db_session.flush()
- # Create a file with created_by_id set
- lib_file = LibraryFile(
- filename="user_uploaded.3mf",
- file_path="/test/user_uploaded.3mf",
- file_size=2048,
- file_type="3mf",
- created_by_id=user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- # Verify file detail shows username
- response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["created_by_id"] == user.id
- assert result["created_by_username"] == "testuploader"
- # Verify file list also shows username
- response = await async_client.get("/api/v1/library/files?include_root=false")
- assert response.status_code == 200
- files = response.json()
- test_file = next((f for f in files if f["id"] == lib_file.id), None)
- assert test_file is not None
- assert test_file["created_by_id"] == user.id
- assert test_file["created_by_username"] == "testuploader"
- class TestLibraryAddToQueueAPI:
- """Integration tests for /api/v1/library/files/add-to-queue endpoint."""
- @pytest.fixture
- async def printer_factory(self, db_session):
- """Factory to create test printers."""
- _counter = [0]
- async def _create_printer(**kwargs):
- from backend.app.models.printer import Printer
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "name": f"Test Printer {counter}",
- "ip_address": f"192.168.1.{100 + counter}",
- "serial_number": f"TESTSERIAL{counter:04d}",
- "access_code": "12345678",
- "model": "X1C",
- }
- defaults.update(kwargs)
- printer = Printer(**defaults)
- db_session.add(printer)
- await db_session.commit()
- await db_session.refresh(printer)
- return printer
- return _create_printer
- @pytest.fixture
- async def library_file_factory(self, db_session):
- """Factory to create test library files."""
- _counter = [0]
- async def _create_library_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_file_{counter}.gcode.3mf",
- "file_path": f"/test/path/test_file_{counter}.gcode.3mf",
- "file_size": 1024,
- "file_type": "3mf",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_library_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_add_to_queue_file_not_found(self, async_client: AsyncClient, printer_factory, db_session):
- """Verify error for non-existent file."""
- await printer_factory()
- data = {"file_ids": [9999]}
- response = await async_client.post("/api/v1/library/files/add-to-queue", json=data)
- assert response.status_code == 200
- result = response.json()
- assert len(result["added"]) == 0
- assert len(result["errors"]) == 1
- assert result["errors"][0]["file_id"] == 9999
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_add_non_sliced_file_to_queue_fails(
- self, async_client: AsyncClient, printer_factory, library_file_factory, db_session
- ):
- """Verify non-sliced file cannot be added to queue."""
- await printer_factory()
- lib_file = await library_file_factory(
- filename="model.stl",
- file_path="/test/path/model.stl",
- file_type="stl",
- )
- data = {"file_ids": [lib_file.id]}
- response = await async_client.post("/api/v1/library/files/add-to-queue", json=data)
- assert response.status_code == 200
- result = response.json()
- assert len(result["added"]) == 0
- assert len(result["errors"]) == 1
- assert "sliced" in result["errors"][0]["error"].lower()
- class TestLibraryZipExtractAPI:
- """Integration tests for ZIP extraction endpoint."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_invalid_file_type(self, async_client: AsyncClient, db_session):
- """Verify non-ZIP files are rejected."""
- # Create a fake file that's not a ZIP
- files = {"file": ("test.txt", b"This is not a zip file", "text/plain")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 400
- assert "ZIP" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_basic(self, async_client: AsyncClient, db_session):
- """Verify basic ZIP extraction works."""
- import io
- # Create a simple ZIP file in memory
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("test1.txt", "Content of file 1")
- zf.writestr("test2.txt", "Content of file 2")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert len(result["files"]) == 2
- assert len(result["errors"]) == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_with_folders(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction preserves folder structure."""
- import io
- # Create a ZIP file with folder structure
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("folder1/file1.txt", "Content 1")
- zf.writestr("folder1/subfolder/file2.txt", "Content 2")
- zf.writestr("folder2/file3.txt", "Content 3")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"preserve_structure": "true"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 3
- assert result["folders_created"] >= 3 # folder1, folder1/subfolder, folder2
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_flat(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction can extract flat (no folders)."""
- import io
- # Create a ZIP file with folder structure
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("folder/file1.txt", "Content 1")
- zf.writestr("folder/file2.txt", "Content 2")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"preserve_structure": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert result["folders_created"] == 0 # No folders created when flat
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_skips_macos_files(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction skips __MACOSX and hidden files."""
- import io
- # Create a ZIP file with macOS junk files
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("real_file.txt", "Real content")
- zf.writestr("__MACOSX/._real_file.txt", "macOS metadata")
- zf.writestr(".hidden_file", "Hidden content")
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 1 # Only real_file.txt
- assert result["files"][0]["filename"] == "real_file.txt"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_create_folder_from_zip(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction creates a folder from the ZIP filename."""
- import io
- # Create a ZIP file with some files
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("file1.txt", "Content 1")
- zf.writestr("file2.txt", "Content 2")
- zip_buffer.seek(0)
- files = {"file": ("MyProject.zip", zip_buffer.read(), "application/zip")}
- params = {"create_folder_from_zip": "true", "preserve_structure": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 2
- assert result["folders_created"] == 1 # MyProject folder created
- # Verify the files are in a folder
- assert result["files"][0]["folder_id"] is not None
- assert result["files"][1]["folder_id"] is not None
- # Both files should be in the same folder
- assert result["files"][0]["folder_id"] == result["files"][1]["folder_id"]
- # Verify the folder was created with the right name
- folder_response = await async_client.get(f"/api/v1/library/folders/{result['files'][0]['folder_id']}")
- assert folder_response.status_code == 200
- folder = folder_response.json()
- assert folder["name"] == "MyProject"
- class TestLibraryStlThumbnailAPI:
- """Integration tests for STL thumbnail generation endpoints."""
- @pytest.fixture
- async def file_factory(self, db_session):
- """Factory to create test files."""
- _counter = [0]
- async def _create_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"test_model_{counter}.stl",
- "file_path": f"/test/path/test_model_{counter}.stl",
- "file_size": 1024,
- "file_type": "stl",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_empty(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with no files."""
- data = {"all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 0
- assert result["succeeded"] == 0
- assert result["failed"] == 0
- assert result["results"] == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_no_criteria(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with no criteria returns empty."""
- data = {}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_file_not_on_disk(
- self, async_client: AsyncClient, file_factory, db_session
- ):
- """Verify batch thumbnail generation handles missing files gracefully."""
- # Create a file in DB but not on disk
- stl_file = await file_factory(
- filename="missing.stl",
- file_path="/nonexistent/path/missing.stl",
- thumbnail_path=None,
- )
- data = {"file_ids": [stl_file.id]}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 1
- assert result["succeeded"] == 0
- assert result["failed"] == 1
- assert result["results"][0]["success"] is False
- assert "not found" in result["results"][0]["error"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_with_real_stl(self, async_client: AsyncClient, db_session):
- """Verify batch thumbnail generation with a real STL file."""
- from backend.app.models.library import LibraryFile
- # Create a simple ASCII STL cube
- stl_content = """solid cube
- facet normal 0 0 -1
- outer loop
- vertex 0 0 0
- vertex 1 0 0
- vertex 1 1 0
- endloop
- endfacet
- facet normal 0 0 1
- outer loop
- vertex 0 0 1
- vertex 1 1 1
- vertex 1 0 1
- endloop
- endfacet
- endsolid cube"""
- with tempfile.NamedTemporaryFile(suffix=".stl", delete=False, mode="w") as f:
- f.write(stl_content)
- stl_path = f.name
- try:
- # Create file in DB pointing to real STL
- lib_file = LibraryFile(
- filename="test_cube.stl",
- file_path=stl_path,
- file_size=len(stl_content),
- file_type="stl",
- thumbnail_path=None,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- data = {"file_ids": [lib_file.id]}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["processed"] == 1
- # Result depends on whether trimesh/matplotlib are installed
- # Either succeeds or fails gracefully
- assert result["succeeded"] + result["failed"] == 1
- finally:
- import os
- if os.path.exists(stl_path):
- os.unlink(stl_path)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_upload_file_with_stl_thumbnail_param(self, async_client: AsyncClient, db_session):
- """Verify file upload accepts generate_stl_thumbnails parameter."""
- # Create a simple STL file
- stl_content = b"solid test\nendsolid test"
- files = {"file": ("test.stl", stl_content, "application/octet-stream")}
- params = {"generate_stl_thumbnails": "false"}
- response = await async_client.post("/api/v1/library/files", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["filename"] == "test.stl"
- assert result["file_type"] == "stl"
- # No thumbnail should be generated when disabled
- assert result["thumbnail_path"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_extract_zip_with_stl_thumbnail_param(self, async_client: AsyncClient, db_session):
- """Verify ZIP extraction accepts generate_stl_thumbnails parameter."""
- # Create a ZIP file containing an STL
- stl_content = b"solid test\nendsolid test"
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("model.stl", stl_content)
- zip_buffer.seek(0)
- files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
- params = {"generate_stl_thumbnails": "false"}
- response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
- assert response.status_code == 200
- result = response.json()
- assert result["extracted"] == 1
- assert result["files"][0]["filename"] == "model.stl"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_by_folder(self, async_client: AsyncClient, file_factory, db_session):
- """Verify batch thumbnail generation can filter by folder."""
- from backend.app.models.library import LibraryFolder
- # Create a folder
- folder = LibraryFolder(name="STL Folder")
- db_session.add(folder)
- await db_session.commit()
- await db_session.refresh(folder)
- # Create STL file in folder (no thumbnail)
- stl_in_folder = await file_factory(
- filename="in_folder.stl",
- folder_id=folder.id,
- thumbnail_path=None,
- )
- # Create STL file at root (no thumbnail)
- _stl_at_root = await file_factory(
- filename="at_root.stl",
- folder_id=None,
- thumbnail_path=None,
- )
- # Request thumbnails only for files in folder
- data = {"folder_id": folder.id, "all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- # Should only process the file in the folder
- assert result["processed"] == 1
- assert result["results"][0]["file_id"] == stl_in_folder.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_batch_generate_thumbnails_all_missing(self, async_client: AsyncClient, file_factory, db_session):
- """Verify batch thumbnail generation finds all STL files missing thumbnails."""
- # Create files with and without thumbnails
- _stl_with_thumb = await file_factory(
- filename="with_thumb.stl",
- thumbnail_path="/some/path/thumb.png",
- )
- stl_without_thumb1 = await file_factory(
- filename="without_thumb1.stl",
- thumbnail_path=None,
- )
- stl_without_thumb2 = await file_factory(
- filename="without_thumb2.stl",
- thumbnail_path=None,
- )
- data = {"all_missing": True}
- response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
- assert response.status_code == 200
- result = response.json()
- # Should only process files without thumbnails
- assert result["processed"] == 2
- file_ids = {r["file_id"] for r in result["results"]}
- assert stl_without_thumb1.id in file_ids
- assert stl_without_thumb2.id in file_ids
- class TestLibraryPathHelpers:
- """Tests for path handling utilities used for backup portability."""
- def test_to_relative_path_converts_absolute(self):
- """Verify absolute paths are converted to relative paths."""
- from backend.app.api.routes.library import to_relative_path
- from backend.app.core.config import settings
- base_dir = str(settings.base_dir)
- abs_path = f"{base_dir}/archive/library/files/test.3mf"
- rel_path = to_relative_path(abs_path)
- assert not rel_path.startswith("/")
- assert rel_path == "archive/library/files/test.3mf"
- def test_to_relative_path_handles_path_object(self):
- """Verify Path objects are handled correctly."""
- from pathlib import Path
- from backend.app.api.routes.library import to_relative_path
- from backend.app.core.config import settings
- abs_path = Path(settings.base_dir) / "archive" / "test.3mf"
- rel_path = to_relative_path(abs_path)
- assert not rel_path.startswith("/")
- assert rel_path == "archive/test.3mf"
- def test_to_relative_path_returns_empty_for_empty_input(self):
- """Verify empty input returns empty string."""
- from backend.app.api.routes.library import to_relative_path
- assert to_relative_path("") == ""
- assert to_relative_path(None) == ""
- def test_to_absolute_path_converts_relative(self):
- """Verify relative paths are converted to absolute paths."""
- from backend.app.api.routes.library import to_absolute_path
- from backend.app.core.config import settings
- rel_path = "archive/library/files/test.3mf"
- abs_path = to_absolute_path(rel_path)
- assert abs_path is not None
- assert abs_path.is_absolute()
- assert str(abs_path) == f"{settings.base_dir}/archive/library/files/test.3mf"
- def test_to_absolute_path_handles_already_absolute(self):
- """Verify already absolute paths are returned as-is (for backwards compatibility)."""
- from backend.app.api.routes.library import to_absolute_path
- abs_path_str = "/data/archive/test.3mf"
- result = to_absolute_path(abs_path_str)
- assert result is not None
- assert str(result) == abs_path_str
- def test_to_absolute_path_returns_none_for_empty(self):
- """Verify None/empty input returns None."""
- from backend.app.api.routes.library import to_absolute_path
- assert to_absolute_path(None) is None
- assert to_absolute_path("") is None
- class TestLibraryPermissions:
- """Tests for library permission enforcement."""
- @pytest.fixture
- async def auth_setup(self, db_session):
- """Set up auth with users of different permission levels."""
- from backend.app.core.auth import create_access_token, get_password_hash
- from backend.app.models.group import Group
- from backend.app.models.settings import Settings
- from backend.app.models.user import User
- # Enable auth
- settings = Settings(key="auth_enabled", value="true")
- db_session.add(settings)
- await db_session.commit()
- # Groups are auto-seeded during db init, but we need to commit them
- await db_session.commit()
- # Get groups
- from sqlalchemy import select
- admin_group = (await db_session.execute(select(Group).where(Group.name == "Administrators"))).scalar_one()
- operator_group = (await db_session.execute(select(Group).where(Group.name == "Operators"))).scalar_one()
- viewer_group = (await db_session.execute(select(Group).where(Group.name == "Viewers"))).scalar_one()
- password_hash = get_password_hash("password")
- # Create users
- admin_user = User(username="admin_lib", password_hash=password_hash, role="admin", is_active=True)
- admin_user.groups.append(admin_group)
- operator_user = User(username="operator_lib", password_hash=password_hash, is_active=True)
- operator_user.groups.append(operator_group)
- viewer_user = User(username="viewer_lib", password_hash=password_hash, is_active=True)
- viewer_user.groups.append(viewer_group)
- db_session.add_all([admin_user, operator_user, viewer_user])
- await db_session.commit()
- # Create tokens
- admin_token = create_access_token(data={"sub": admin_user.username})
- operator_token = create_access_token(data={"sub": operator_user.username})
- viewer_token = create_access_token(data={"sub": viewer_user.username})
- return {
- "admin_user": admin_user,
- "operator_user": operator_user,
- "viewer_user": viewer_user,
- "admin_token": admin_token,
- "operator_token": operator_token,
- "viewer_token": viewer_token,
- }
- @pytest.fixture
- async def test_file(self, db_session, auth_setup):
- """Create a test file owned by the operator user."""
- from backend.app.models.library import LibraryFile
- operator_user = auth_setup["operator_user"]
- lib_file = LibraryFile(
- filename="test.txt",
- file_path="data/archive/library/files/test.txt",
- file_type="txt",
- file_size=100,
- created_by_id=operator_user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_requires_library_read(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify list_files requires library:read permission."""
- viewer_token = auth_setup["viewer_token"]
- # Viewers have library:read, should succeed
- response = await async_client.get("/api/v1/library/files", headers={"Authorization": f"Bearer {viewer_token}"})
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_files_denied_without_permission(self, async_client: AsyncClient, db_session):
- """Verify list_files denied without auth when auth is enabled."""
- from backend.app.models.settings import Settings
- # Enable auth
- settings = Settings(key="auth_enabled", value="true")
- db_session.add(settings)
- await db_session.commit()
- # Request without token should fail
- response = await async_client.get("/api/v1/library/files")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_own_by_owner(self, async_client: AsyncClient, db_session, auth_setup, test_file):
- """Verify operator can delete their own files."""
- from pathlib import Path
- # Create actual file on disk so delete doesn't fail
- from backend.app.core.config import settings as app_settings
- file_path = Path(app_settings.base_dir) / test_file.file_path
- file_path.parent.mkdir(parents=True, exist_ok=True)
- file_path.write_text("test content")
- operator_token = auth_setup["operator_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{test_file.id}", headers={"Authorization": f"Bearer {operator_token}"}
- )
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_own_denied_for_others_file(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify operator cannot delete files owned by others."""
- # Create another operator user with a file
- from sqlalchemy import select
- from backend.app.core.auth import create_access_token
- from backend.app.models.group import Group
- from backend.app.models.library import LibraryFile
- from backend.app.models.user import User
- operator_group = (await db_session.execute(select(Group).where(Group.name == "Operators"))).scalar_one()
- from backend.app.core.auth import get_password_hash as get_pw_hash
- other_user = User(username="other_op", password_hash=get_pw_hash("password"), is_active=True)
- other_user.groups.append(operator_group)
- db_session.add(other_user)
- await db_session.commit()
- await db_session.refresh(other_user)
- # Create file owned by other user
- other_file = LibraryFile(
- filename="other.txt",
- file_path="data/archive/library/files/other.txt",
- file_type="txt",
- file_size=100,
- created_by_id=other_user.id,
- )
- db_session.add(other_file)
- await db_session.commit()
- await db_session.refresh(other_file)
- # Original operator should not be able to delete it
- operator_token = auth_setup["operator_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{other_file.id}", headers={"Authorization": f"Bearer {operator_token}"}
- )
- assert response.status_code == 403
- assert "your own files" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_admin_can_delete_any(self, async_client: AsyncClient, db_session, auth_setup):
- """Verify admin can delete any file."""
- from pathlib import Path
- from backend.app.core.config import settings as app_settings
- from backend.app.models.library import LibraryFile
- # Create file owned by operator
- operator_user = auth_setup["operator_user"]
- lib_file = LibraryFile(
- filename="admin_can_delete.txt",
- file_path="data/archive/library/files/admin_can_delete.txt",
- file_type="txt",
- file_size=100,
- created_by_id=operator_user.id,
- )
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- # Create actual file on disk
- file_path = Path(app_settings.base_dir) / lib_file.file_path
- file_path.parent.mkdir(parents=True, exist_ok=True)
- file_path.write_text("test content")
- # Admin should be able to delete it
- admin_token = auth_setup["admin_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{lib_file.id}", headers={"Authorization": f"Bearer {admin_token}"}
- )
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_viewer_cannot_delete_files(self, async_client: AsyncClient, db_session, auth_setup, test_file):
- """Verify viewer cannot delete any files."""
- viewer_token = auth_setup["viewer_token"]
- response = await async_client.delete(
- f"/api/v1/library/files/{test_file.id}", headers={"Authorization": f"Bearer {viewer_token}"}
- )
- # Viewers don't have delete_own or delete_all permissions
- assert response.status_code == 403
- class TestPrintFileUploadValidation:
- """#1401: pre-flight rejection of unprintable uploads at the library +
- archive routes. Smoke tests the shared ``validate_print_file_upload``
- helper through both surfaces a user can reach with a drag-drop."""
- def _valid_3mf_bytes(self, name: str = "Metadata/plate_1.gcode") -> bytes:
- """Build a minimal-but-real zip with the gcode-3mf magic in it so
- the validator's ``startswith(b"PK\\x03\\x04")`` check passes."""
- buf = io.BytesIO()
- with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr(name, "; G-code\nG28\n")
- return buf.getvalue()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_rejects_raw_gcode_upload(self, async_client: AsyncClient, db_session):
- """``Foo.gcode`` direct uploads are blocked at the library route —
- the dispatcher would otherwise append ``.3mf`` and ship raw gcode
- to the printer as a fake 3MF."""
- files = {"file": ("plate_1.gcode", b"; raw gcode\nG28\n", "application/octet-stream")}
- response = await async_client.post("/api/v1/library/files", files=files)
- assert response.status_code == 400
- # Error message must name the actual remedy, not just say "invalid".
- assert "gcode.3mf" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_rejects_non_zip_3mf_upload(self, async_client: AsyncClient, db_session):
- """A ``.3mf`` upload whose body isn't a zip is rejected — covers
- raw gcode renamed to .3mf, corrupted downloads, etc."""
- files = {"file": ("model.3mf", b"; raw gcode\nG28\n", "application/octet-stream")}
- response = await async_client.post("/api/v1/library/files", files=files)
- assert response.status_code == 400
- assert "ZIP container" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_rejects_non_zip_gcode_3mf_upload(self, async_client: AsyncClient, db_session):
- """The compound-extension ``.gcode.3mf`` case is gated by the same
- zip-magic check — splitext returns just ``.3mf``, but the suffix
- match covers both."""
- files = {"file": ("plate_1.gcode.3mf", b"; raw gcode\nG28\n", "application/octet-stream")}
- response = await async_client.post("/api/v1/library/files", files=files)
- assert response.status_code == 400
- assert "ZIP container" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_accepts_valid_gcode_3mf_upload(self, async_client: AsyncClient, db_session):
- """A real ``.gcode.3mf`` zip uploads successfully — the existing
- happy path is not regressed by the new validation."""
- files = {
- "file": (
- "plate_1.gcode.3mf",
- self._valid_3mf_bytes(),
- "application/zip",
- )
- }
- response = await async_client.post("/api/v1/library/files", files=files)
- assert response.status_code == 200
- result = response.json()
- assert result["filename"] == "plate_1.gcode.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_library_still_accepts_non_print_extensions(self, async_client: AsyncClient, db_session):
- """STL / image / other non-print uploads bypass the validator
- entirely — Bambuddy is also a library, not just a print dispatcher."""
- files = {"file": ("model.stl", b"solid test\nendsolid test", "application/octet-stream")}
- response = await async_client.post(
- "/api/v1/library/files", files=files, params={"generate_stl_thumbnails": "false"}
- )
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_upload_rejects_non_zip(self, async_client: AsyncClient, db_session):
- """``POST /archives/upload`` shares the same validator — covers the
- manual archive-upload entry point too."""
- files = {"file": ("model.3mf", b"; raw gcode\nG28\n", "application/octet-stream")}
- response = await async_client.post("/api/v1/archives/upload", files=files)
- assert response.status_code == 400
- assert "ZIP container" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_bulk_upload_collects_per_file_errors(self, async_client: AsyncClient, db_session):
- """The bulk-archive route reports validation failures per file and
- continues processing the remaining items — one bad upload in a
- 10-file drag-drop must not abort the whole batch."""
- good = self._valid_3mf_bytes()
- bad = b"; raw gcode\nG28\n"
- # httpx multipart with a list-of-tuples preserves order + same field name.
- files = [
- ("files", ("good.3mf", good, "application/zip")),
- ("files", ("bad.3mf", bad, "application/octet-stream")),
- ]
- response = await async_client.post("/api/v1/archives/upload-bulk", files=files)
- assert response.status_code == 200
- body = response.json()
- # The bulk route's archive_print may still reject the "good" file
- # downstream (no printer match, etc.) — we don't care about that
- # here; what matters is the bad file lands in `errors` with the
- # validator's message and the route didn't 500.
- assert body["failed"] >= 1
- bad_errors = [e for e in body["errors"] if e["filename"] == "bad.3mf"]
- assert bad_errors, body
- assert "ZIP container" in bad_errors[0]["error"]
|