| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300 |
- """Integration tests for Projects API endpoints."""
- import pytest
- from httpx import AsyncClient
- class TestProjectsAPI:
- """Integration tests for /api/v1/projects endpoints."""
- @pytest.fixture
- async def project_factory(self, db_session):
- """Factory to create test projects."""
- _counter = [0]
- async def _create_project(**kwargs):
- from backend.app.models.project import Project
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "name": f"Test Project {counter}",
- "description": "Test project description",
- "color": "#FF0000",
- }
- defaults.update(kwargs)
- project = Project(**defaults)
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- return project
- return _create_project
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_projects_empty(self, async_client: AsyncClient):
- """Verify empty list when no projects exist."""
- response = await async_client.get("/api/v1/projects/")
- assert response.status_code == 200
- assert isinstance(response.json(), list)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_projects_with_data(self, async_client: AsyncClient, project_factory, db_session):
- """Verify list returns existing projects."""
- await project_factory(name="My Project")
- response = await async_client.get("/api/v1/projects/")
- assert response.status_code == 200
- data = response.json()
- assert any(p["name"] == "My Project" for p in data)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_project(self, async_client: AsyncClient):
- """Verify project can be created."""
- data = {
- "name": "New Project",
- "description": "A new project",
- "color": "#00FF00",
- }
- response = await async_client.post("/api/v1/projects/", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "New Project"
- assert result["color"] == "#00FF00"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_project(self, async_client: AsyncClient, project_factory, db_session):
- """Verify single project can be retrieved."""
- project = await project_factory(name="Get Test Project")
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- assert response.json()["name"] == "Get Test Project"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_project_not_found(self, async_client: AsyncClient):
- """Verify 404 for non-existent project."""
- response = await async_client.get("/api/v1/projects/9999")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_project(self, async_client: AsyncClient, project_factory, db_session):
- """Verify project can be updated."""
- project = await project_factory(name="Original")
- response = await async_client.patch(
- f"/api/v1/projects/{project.id}", json={"name": "Updated", "description": "Updated description"}
- )
- assert response.status_code == 200
- result = response.json()
- assert result["name"] == "Updated"
- assert result["description"] == "Updated description"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_project(self, async_client: AsyncClient, project_factory, db_session):
- """Verify project can be deleted."""
- project = await project_factory()
- response = await async_client.delete(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- data = response.json()
- assert data["message"] == "Project deleted"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_project_not_found(self, async_client: AsyncClient):
- """Verify 404 for deleting non-existent project."""
- response = await async_client.delete("/api/v1/projects/9999")
- assert response.status_code == 404
- class TestProjectUrlAndCoverImage:
- """Tests for #1155 — url field + cover image upload/get/delete."""
- @pytest.fixture
- async def project_factory(self, db_session):
- async def _create(**kwargs):
- from backend.app.models.project import Project
- defaults = {"name": "URL/Cover Project", "color": "#00ff00"}
- defaults.update(kwargs)
- project = Project(**defaults)
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- return project
- return _create
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_project_accepts_https_url(self, async_client: AsyncClient):
- response = await async_client.post(
- "/api/v1/projects/",
- json={"name": "With URL", "url": "https://makerworld.com/models/12345"},
- )
- assert response.status_code == 200
- body = response.json()
- assert body["url"] == "https://makerworld.com/models/12345"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_project_rejects_javascript_url(self, async_client: AsyncClient):
- # `<a href>` rendering would execute javascript: URLs — schema must reject.
- response = await async_client.post(
- "/api/v1/projects/",
- json={"name": "Hostile", "url": "javascript:alert(1)"},
- )
- assert response.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_project_rejects_data_url(self, async_client: AsyncClient):
- response = await async_client.post(
- "/api/v1/projects/",
- json={"name": "Hostile", "url": "data:text/html,<script>alert(1)</script>"},
- )
- assert response.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_patch_project_clears_url_when_explicitly_null(self, async_client: AsyncClient, project_factory):
- project = await project_factory(url="https://example.com")
- response = await async_client.patch(f"/api/v1/projects/{project.id}", json={"url": None})
- assert response.status_code == 200
- assert response.json()["url"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_upload_cover_image_then_serve_then_delete(self, async_client: AsyncClient, project_factory):
- project = await project_factory()
- # 1x1 PNG (smallest valid PNG bytes)
- png_bytes = bytes.fromhex(
- "89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4"
- "890000000d49444154789c63f80f00000100010000000000000049454e44ae42"
- "6082"
- )
- upload = await async_client.post(
- f"/api/v1/projects/{project.id}/cover-image",
- files={"file": ("cover.png", png_bytes, "image/png")},
- )
- assert upload.status_code == 200, upload.text
- body = upload.json()
- assert body["status"] == "success"
- assert body["filename"].endswith(".png")
- cover_filename = body["filename"]
- # GET should serve the bytes back
- served = await async_client.get(f"/api/v1/projects/{project.id}/cover-image")
- assert served.status_code == 200
- assert served.headers["content-type"] == "image/png"
- assert served.content == png_bytes
- # Project response should reflect the cover_image_filename field
- view = await async_client.get(f"/api/v1/projects/{project.id}")
- assert view.json()["cover_image_filename"] == cover_filename
- # DELETE should clear the field
- deleted = await async_client.delete(f"/api/v1/projects/{project.id}/cover-image")
- assert deleted.status_code == 200
- view2 = await async_client.get(f"/api/v1/projects/{project.id}")
- assert view2.json()["cover_image_filename"] is None
- # And subsequent GET should 404
- served2 = await async_client.get(f"/api/v1/projects/{project.id}/cover-image")
- assert served2.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_upload_cover_image_rejects_non_image(self, async_client: AsyncClient, project_factory):
- project = await project_factory()
- response = await async_client.post(
- f"/api/v1/projects/{project.id}/cover-image",
- files={"file": ("evil.exe", b"MZ\x00\x00", "application/octet-stream")},
- )
- assert response.status_code == 400
- @pytest.mark.integration
- def test_cover_image_get_uses_stream_token_gate(self):
- """Regression guard: GET /projects/{id}/cover-image MUST be gated by
- ``RequireCameraStreamTokenIfAuthEnabled`` (accepts ``?token=…`` query
- string) rather than by the bearer-token gate, because browsers can't
- attach an ``Authorization`` header to ``<img src>`` requests. Swapping
- back to the bearer gate would silently 401 every cover image when auth
- is enabled."""
- from fastapi.routing import APIRoute
- from backend.app.api.routes.projects import router
- # Find the GET cover-image route. The router exposes path/methods/
- # dependencies via APIRoute objects.
- cover_get = None
- for route in router.routes:
- if isinstance(route, APIRoute) and route.path.endswith("/cover-image") and "GET" in route.methods:
- cover_get = route
- break
- assert cover_get is not None, "GET cover-image route missing"
- # The route's dependant tree includes a Depends(require_camera_stream_token_if_auth_enabled())
- # — its `call` is the inner check function returned by that factory.
- # Walk the dependant tree and assert one of the dependencies came from
- # the stream-token factory, NOT from require_permission_if_auth_enabled.
- from backend.app.core.auth import (
- require_camera_stream_token_if_auth_enabled,
- )
- # The factory returns a fresh closure each call; the most reliable
- # signature is the qualified name of the function in the closure chain.
- expected_qualname = require_camera_stream_token_if_auth_enabled().__qualname__
- gate_qualnames = [dep.call.__qualname__ for dep in cover_get.dependant.dependencies if dep.call]
- assert expected_qualname in gate_qualnames, (
- f"GET cover-image route is not gated by RequireCameraStreamTokenIfAuthEnabled. Found: {gate_qualnames}"
- )
- class TestProjectPartsTracking:
- """Tests for project parts tracking feature."""
- @pytest.fixture
- async def project_factory(self, db_session):
- """Factory to create test projects."""
- async def _create_project(**kwargs):
- from backend.app.models.project import Project
- defaults = {
- "name": "Parts Test Project",
- "description": "Test project",
- "color": "#FF0000",
- }
- defaults.update(kwargs)
- project = Project(**defaults)
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- return project
- return _create_project
- @pytest.fixture
- async def archive_factory(self, db_session):
- """Factory to create a test archive plus a matching PrintLogEntry.
- Project stats aggregate from ``print_log_entries`` (#1593), so a
- test that only writes archives wouldn't exercise the production
- path — production always writes one log entry per run. The
- factory mirrors that: every archive whose status is anything other
- than ``"archived"`` (file shelved without printing) gets a log
- entry whose status matches the archive.
- """
- async def _create_archive(**kwargs):
- from backend.app.models.archive import PrintArchive
- from backend.app.models.print_log import PrintLogEntry
- defaults = {
- "filename": "test.3mf",
- "file_path": "test/test.3mf",
- "file_size": 1000,
- "print_name": "Test Print",
- "status": "completed",
- "quantity": 1,
- }
- defaults.update(kwargs)
- archive = PrintArchive(**defaults)
- db_session.add(archive)
- await db_session.commit()
- await db_session.refresh(archive)
- if archive.status != "archived":
- db_session.add(
- PrintLogEntry(
- archive_id=archive.id,
- print_name=archive.print_name,
- status=archive.status,
- )
- )
- await db_session.commit()
- return archive
- return _create_archive
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_project_with_target_parts_count(self, async_client: AsyncClient):
- """Verify project can be created with target_parts_count."""
- data = {
- "name": "Parts Project",
- "target_count": 10, # 10 plates
- "target_parts_count": 50, # 50 parts total
- }
- response = await async_client.post("/api/v1/projects/", json=data)
- assert response.status_code == 200
- result = response.json()
- assert result["target_count"] == 10
- assert result["target_parts_count"] == 50
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_project_target_parts_count(self, async_client: AsyncClient, project_factory, db_session):
- """Verify target_parts_count can be updated."""
- project = await project_factory()
- response = await async_client.patch(
- f"/api/v1/projects/{project.id}",
- json={"target_parts_count": 100},
- )
- assert response.status_code == 200
- assert response.json()["target_parts_count"] == 100
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_project_parts_progress_calculation(
- self, async_client: AsyncClient, project_factory, archive_factory, db_session
- ):
- """Verify parts progress is calculated from archive quantities."""
- # Create project with target of 20 parts
- project = await project_factory(target_parts_count=20)
- # Create archives with different quantities
- await archive_factory(project_id=project.id, quantity=3, status="completed") # 3 parts
- await archive_factory(project_id=project.id, quantity=5, status="completed") # 5 parts
- await archive_factory(project_id=project.id, quantity=2, status="completed") # 2 parts
- # Total: 10 parts completed out of 20 = 50%
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- data = response.json()
- # Check stats
- assert data["stats"]["completed_prints"] == 10 # Sum of quantities
- assert data["stats"]["parts_progress_percent"] == 50.0 # 10/20 = 50%
- assert data["stats"]["remaining_parts"] == 10 # 20 - 10 = 10
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_project_list_shows_parts_count(
- self, async_client: AsyncClient, project_factory, archive_factory, db_session
- ):
- """Verify project list returns correct completed_count (parts sum)."""
- project = await project_factory(name="List Parts Project", target_parts_count=100)
- # Create archives with quantities
- await archive_factory(project_id=project.id, quantity=4, status="completed")
- await archive_factory(project_id=project.id, quantity=6, status="completed")
- # Total: 10 parts, 2 plates
- response = await async_client.get("/api/v1/projects/")
- assert response.status_code == 200
- data = response.json()
- # Find our project
- our_project = next((p for p in data if p["name"] == "List Parts Project"), None)
- assert our_project is not None
- assert our_project["archive_count"] == 2 # 2 plates
- assert our_project["completed_count"] == 10 # 10 parts (sum of quantities)
- assert our_project["target_parts_count"] == 100
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_plates_vs_parts_progress(
- self, async_client: AsyncClient, project_factory, archive_factory, db_session
- ):
- """Verify plates and parts progress are calculated separately."""
- # Project needs 5 plates producing 25 parts total (5 parts per plate)
- project = await project_factory(target_count=5, target_parts_count=25)
- # Complete 2 plates, each with 5 parts
- await archive_factory(project_id=project.id, quantity=5, status="completed")
- await archive_factory(project_id=project.id, quantity=5, status="completed")
- # Plates: 2/5 = 40%, Parts: 10/25 = 40%
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- data = response.json()
- assert data["stats"]["total_archives"] == 2 # 2 plates
- assert data["stats"]["completed_prints"] == 10 # 10 parts
- assert data["stats"]["progress_percent"] == 40.0 # plates: 2/5
- assert data["stats"]["parts_progress_percent"] == 40.0 # parts: 10/25
- class TestProjectArchivedStatusNotCounted:
- """Tests for bug #630: archived files added to a project should not count as printed."""
- @pytest.fixture
- async def project_factory(self, db_session):
- """Factory to create test projects."""
- async def _create_project(**kwargs):
- from backend.app.models.project import Project
- defaults = {
- "name": "Archived Status Test",
- "description": "Test project",
- "color": "#FF0000",
- }
- defaults.update(kwargs)
- project = Project(**defaults)
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- return project
- return _create_project
- @pytest.fixture
- async def archive_factory(self, db_session):
- """Factory to create a test archive plus a matching PrintLogEntry —
- see TestProjectPartsTracking.archive_factory for rationale (#1593)."""
- async def _create_archive(**kwargs):
- from backend.app.models.archive import PrintArchive
- from backend.app.models.print_log import PrintLogEntry
- defaults = {
- "filename": "test.3mf",
- "file_path": "test/test.3mf",
- "file_size": 1000,
- "print_name": "Test Print",
- "status": "completed",
- "quantity": 1,
- }
- defaults.update(kwargs)
- archive = PrintArchive(**defaults)
- db_session.add(archive)
- await db_session.commit()
- await db_session.refresh(archive)
- if archive.status != "archived":
- db_session.add(
- PrintLogEntry(
- archive_id=archive.id,
- print_name=archive.print_name,
- status=archive.status,
- )
- )
- await db_session.commit()
- return archive
- return _create_archive
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archived_files_not_counted_as_completed(
- self, async_client: AsyncClient, project_factory, archive_factory, db_session
- ):
- """Archived files added to a project should not count in completed_prints stats."""
- project = await project_factory(target_parts_count=20)
- # 2 actually printed (completed), 3 just archived (not printed yet)
- await archive_factory(project_id=project.id, quantity=2, status="completed")
- await archive_factory(project_id=project.id, quantity=3, status="archived")
- await archive_factory(project_id=project.id, quantity=5, status="archived")
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- data = response.json()
- # Only the completed archive should count
- assert data["stats"]["completed_prints"] == 2
- assert data["stats"]["parts_progress_percent"] == 10.0 # 2/20 = 10%
- assert data["stats"]["remaining_parts"] == 18
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archived_files_not_counted_in_project_list(
- self, async_client: AsyncClient, project_factory, archive_factory, db_session
- ):
- """Project list endpoint should not count archived files as completed."""
- project = await project_factory(name="List Archived Test", target_parts_count=50)
- await archive_factory(project_id=project.id, quantity=4, status="completed")
- await archive_factory(project_id=project.id, quantity=6, status="archived")
- response = await async_client.get("/api/v1/projects/")
- assert response.status_code == 200
- data = response.json()
- our_project = next((p for p in data if p["name"] == "List Archived Test"), None)
- assert our_project is not None
- assert our_project["completed_count"] == 4 # Only completed, not archived
- # Post-#1593: archive_count is "print runs", not "files attached". An
- # ``archived``-status file (shelved without printing) has no
- # PrintLogEntry and doesn't count — only the actual printed run does.
- assert our_project["archive_count"] == 1
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_only_completed_status_counts(
- self, async_client: AsyncClient, project_factory, archive_factory, db_session
- ):
- """Only 'completed' status should count in stats, not archived/failed/etc."""
- project = await project_factory(target_parts_count=100)
- await archive_factory(project_id=project.id, quantity=10, status="completed")
- await archive_factory(project_id=project.id, quantity=5, status="archived")
- await archive_factory(project_id=project.id, quantity=3, status="failed")
- await archive_factory(project_id=project.id, quantity=2, status="aborted")
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- data = response.json()
- assert data["stats"]["completed_prints"] == 10 # Only "completed"
- assert data["stats"]["failed_prints"] == 2 # failed + aborted (count of runs)
- # Post-#1593: total_archives counts runs from print_log_entries, not
- # files. The ``archived`` row is a shelved file with no run, so it
- # contributes 0; the other three (completed, failed, aborted) each
- # produced a run.
- assert data["stats"]["total_archives"] == 3
- # total_items sums quantity per run: 10 (completed) + 3 (failed) + 2 (aborted) = 15
- assert data["stats"]["total_items"] == 15
- class TestProjectStatsPerRun:
- """Project stats aggregate per-run from ``print_log_entries`` so
- reprints and multi-plate prints count every run (#1593). Pre-fix the
- stats counted ``print_archives`` (one row per file), so 3 reprints of
- one file showed as 1 job with plate-1-only filament/time/cost.
- """
- @pytest.fixture
- async def project_factory(self, db_session):
- async def _create_project(**kwargs):
- from backend.app.models.project import Project
- defaults = {"name": "Per-Run Stats Project", "color": "#FF0000"}
- defaults.update(kwargs)
- project = Project(**defaults)
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- return project
- return _create_project
- @pytest.fixture
- async def archive_with_runs(self, db_session):
- """Build a single archive + N PrintLogEntry rows.
- Models the reporter's case: one source file (archive) is reprinted
- N times, each run with its own duration / filament / cost.
- """
- async def _create(*, project_id: int, runs: list[dict], archive_status: str = "completed", quantity: int = 1):
- from backend.app.models.archive import PrintArchive
- from backend.app.models.print_log import PrintLogEntry
- archive = PrintArchive(
- filename="reprinted.3mf",
- file_path="test/reprinted.3mf",
- file_size=1000,
- print_name="Reprinted Print",
- status=archive_status,
- quantity=quantity,
- project_id=project_id,
- )
- db_session.add(archive)
- await db_session.commit()
- await db_session.refresh(archive)
- for run in runs:
- db_session.add(
- PrintLogEntry(
- archive_id=archive.id,
- print_name=archive.print_name,
- status=run.get("status", "completed"),
- duration_seconds=run.get("duration_seconds"),
- filament_used_grams=run.get("filament_used_grams"),
- cost=run.get("cost"),
- energy_kwh=run.get("energy_kwh"),
- energy_cost=run.get("energy_cost"),
- )
- )
- await db_session.commit()
- return archive
- return _create
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_three_reprints_count_as_three_jobs_with_summed_totals(
- self, async_client: AsyncClient, project_factory, archive_with_runs
- ):
- """Reporter's case: 3 runs of one multi-plate file should report
- 3 jobs and summed time / filament / cost — pre-fix it reported 1
- job with plate-1-only totals."""
- project = await project_factory()
- await archive_with_runs(
- project_id=project.id,
- runs=[
- {"duration_seconds": 7140, "filament_used_grams": 19.2, "cost": 0.40},
- {"duration_seconds": 6000, "filament_used_grams": 20.0, "cost": 0.40},
- {"duration_seconds": 6300, "filament_used_grams": 18.8, "cost": 0.40},
- ],
- )
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- stats = response.json()["stats"]
- assert stats["total_archives"] == 3, "3 runs must show as 3 jobs"
- assert stats["completed_prints"] == 3, "Each run with quantity=1 contributes 1 part"
- assert stats["total_filament_grams"] == round(19.2 + 20.0 + 18.8, 2)
- assert stats["total_print_time_hours"] == round((7140 + 6000 + 6300) / 3600, 2)
- # Cost rounds at 2 decimals — 3 * 0.40 = 1.20
- assert stats["estimated_cost"] == 1.20
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_orphan_log_entries_do_not_bleed_into_projects(
- self, async_client: AsyncClient, project_factory, db_session
- ):
- """Log rows whose ``archive_id`` is NULL (archive deleted via
- ON DELETE SET NULL) must not leak into any project — the inner
- join filters them out by construction."""
- from backend.app.models.print_log import PrintLogEntry
- project = await project_factory()
- # Orphan log entries — no archive_id.
- for _ in range(5):
- db_session.add(
- PrintLogEntry(
- archive_id=None,
- print_name="Orphan Run",
- status="completed",
- duration_seconds=3600,
- filament_used_grams=20.0,
- cost=0.5,
- )
- )
- await db_session.commit()
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- stats = response.json()["stats"]
- # None of the orphan rows are attributable to this project.
- assert stats["total_archives"] == 0
- assert stats["completed_prints"] == 0
- assert stats["total_filament_grams"] == 0
- assert stats["total_print_time_hours"] == 0
- assert stats["estimated_cost"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_mixed_run_outcomes_split_completed_and_failed(
- self, async_client: AsyncClient, project_factory, archive_with_runs
- ):
- """A multi-run archive with mixed outcomes splits cleanly between
- completed_prints (per-quantity) and failed_prints (per-run)."""
- project = await project_factory()
- await archive_with_runs(
- project_id=project.id,
- quantity=2,
- runs=[
- {"status": "completed", "filament_used_grams": 30.0},
- {"status": "completed", "filament_used_grams": 30.0},
- {"status": "failed", "filament_used_grams": 5.0},
- {"status": "aborted", "filament_used_grams": 2.0},
- ],
- )
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- stats = response.json()["stats"]
- assert stats["total_archives"] == 4
- # 2 completed runs × quantity=2 each = 4 parts
- assert stats["completed_prints"] == 4
- # 2 failure runs (failed + aborted) count as 2, not 2*quantity
- assert stats["failed_prints"] == 2
- # All 4 runs contribute filament: 30 + 30 + 5 + 2 = 67
- assert stats["total_filament_grams"] == 67.0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_quick_stats_in_list_view_agree_with_per_project_stats(
- self, async_client: AsyncClient, project_factory, archive_with_runs
- ):
- """The /projects list view's quick stats must agree with
- /projects/{id}'s detailed stats — both come from the same per-run
- aggregation."""
- project = await project_factory(name="Quick-Stats Alignment")
- await archive_with_runs(
- project_id=project.id,
- quantity=1,
- runs=[
- {"status": "completed"},
- {"status": "completed"},
- {"status": "failed"},
- ],
- )
- list_resp = await async_client.get("/api/v1/projects/")
- ours = next(p for p in list_resp.json() if p["name"] == "Quick-Stats Alignment")
- assert ours["archive_count"] == 3
- assert ours["completed_count"] == 2
- assert ours["failed_count"] == 1
- class TestProjectArchivesAPI:
- """Tests for project-archive relationships."""
- @pytest.fixture
- async def project_factory(self, db_session):
- """Factory to create test projects."""
- async def _create_project(**kwargs):
- from backend.app.models.project import Project
- defaults = {
- "name": "Archive Test Project",
- "description": "Test project",
- "color": "#0000FF",
- }
- defaults.update(kwargs)
- project = Project(**defaults)
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- return project
- return _create_project
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_project_with_archives(self, async_client: AsyncClient, project_factory, db_session):
- """Verify project can be retrieved with archive count."""
- project = await project_factory()
- response = await async_client.get(f"/api/v1/projects/{project.id}")
- assert response.status_code == 200
- # Project should have an archive count (may be 0)
- data = response.json()
- assert "name" in data
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_archives_in_project_returns_archives_with_creator(
- self, async_client: AsyncClient, project_factory, db_session
- ):
- """``GET /projects/{id}/archives`` must eagerly load both the project AND
- the creator User. Without selectinload(created_by) the response
- converter triggers a lazy attribute load on a closed async session
- and the request 500s with MissingGreenlet — exactly what was reported
- the moment a user with auth enabled (so archives carry created_by_id)
- opened a project view.
- """
- from backend.app.models.archive import PrintArchive
- from backend.app.models.user import User
- # Seed: a user (the eventual creator) and a project owning two archives,
- # one with created_by_id set, one without.
- creator = User(
- username="archive-creator",
- password_hash="x",
- role="user",
- is_active=True,
- )
- db_session.add(creator)
- await db_session.commit()
- await db_session.refresh(creator)
- project = await project_factory(name="Project Archives Smoke")
- attributed = PrintArchive(
- filename="attributed.3mf",
- file_path="x/attributed.3mf",
- file_size=2048,
- print_name="Attributed Print",
- status="completed",
- quantity=1,
- project_id=project.id,
- created_by_id=creator.id,
- )
- anonymous = PrintArchive(
- filename="anon.3mf",
- file_path="x/anon.3mf",
- file_size=2048,
- print_name="Anonymous Print",
- status="completed",
- quantity=1,
- project_id=project.id,
- created_by_id=None,
- )
- db_session.add_all([attributed, anonymous])
- await db_session.commit()
- response = await async_client.get(f"/api/v1/projects/{project.id}/archives?limit=100&offset=0")
- assert response.status_code == 200, f"Expected 200, got {response.status_code} body={response.text}"
- rows = response.json()
- assert len(rows) == 2
- # Both archive shapes serialise — the attributed one surfaces the
- # creator username (proving the eager-load worked) and the anonymous
- # one stays None without exploding.
- by_filename = {r["filename"]: r for r in rows}
- assert by_filename["attributed.3mf"]["created_by_username"] == "archive-creator"
- assert by_filename["attributed.3mf"]["created_by_id"] == creator.id
- assert by_filename["anon.3mf"]["created_by_username"] is None
- assert by_filename["anon.3mf"]["created_by_id"] is None
- class TestProjectExportImport:
- """Tests for project export/import functionality."""
- @pytest.fixture
- async def project_factory(self, db_session):
- """Factory to create test projects."""
- _counter = [0]
- async def _create_project(**kwargs):
- from backend.app.models.project import Project
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "name": f"Export Test Project {counter}",
- "description": "Test project for export",
- "color": "#00FF00",
- }
- defaults.update(kwargs)
- project = Project(**defaults)
- db_session.add(project)
- await db_session.commit()
- await db_session.refresh(project)
- return project
- return _create_project
- @pytest.fixture
- async def bom_item_factory(self, db_session):
- """Factory to create test BOM items."""
- async def _create_bom_item(project_id: int, **kwargs):
- from backend.app.models.project_bom import ProjectBOMItem
- defaults = {
- "project_id": project_id,
- "name": "Test Part",
- "quantity_needed": 1,
- "quantity_acquired": 0,
- "sort_order": 0,
- }
- defaults.update(kwargs)
- item = ProjectBOMItem(**defaults)
- db_session.add(item)
- await db_session.commit()
- await db_session.refresh(item)
- return item
- return _create_bom_item
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_export_project(self, async_client: AsyncClient, project_factory, bom_item_factory, db_session):
- """Verify project export includes BOM items."""
- project = await project_factory(
- name="Export Me",
- description="A test project",
- target_count=10,
- target_parts_count=50,
- budget=100.0,
- )
- # Add BOM items
- await bom_item_factory(project.id, name="M3x8 Screws", quantity_needed=20, unit_price=0.10)
- await bom_item_factory(project.id, name="Heat Inserts", quantity_needed=10, unit_price=0.25)
- # Test JSON format export
- response = await async_client.get(f"/api/v1/projects/{project.id}/export?format=json")
- assert response.status_code == 200
- data = response.json()
- assert data["name"] == "Export Me"
- assert data["description"] == "A test project"
- assert data["target_count"] == 10
- assert data["target_parts_count"] == 50
- assert data["budget"] == 100.0
- assert len(data["bom_items"]) == 2
- # Check BOM items
- bom_names = [item["name"] for item in data["bom_items"]]
- assert "M3x8 Screws" in bom_names
- assert "Heat Inserts" in bom_names
- # Test ZIP format export (default)
- zip_response = await async_client.get(f"/api/v1/projects/{project.id}/export")
- assert zip_response.status_code == 200
- assert zip_response.headers["content-type"] == "application/zip"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_project(self, async_client: AsyncClient):
- """Verify project can be imported with BOM items."""
- import_data = {
- "name": "Imported Project",
- "description": "Imported from JSON",
- "color": "#FF00FF",
- "target_count": 5,
- "target_parts_count": 25,
- "budget": 50.0,
- "bom_items": [
- {
- "name": "PTFE Tubes",
- "quantity_needed": 4,
- "quantity_acquired": 0,
- "unit_price": 2.50,
- "sourcing_url": "https://example.com",
- "stl_filename": None,
- "remarks": "Need 4mm ID",
- },
- ],
- }
- response = await async_client.post("/api/v1/projects/import", json=import_data)
- assert response.status_code == 200
- data = response.json()
- assert data["name"] == "Imported Project"
- assert data["description"] == "Imported from JSON"
- assert data["target_count"] == 5
- assert data["target_parts_count"] == 25
- assert data["budget"] == 50.0
- assert data["id"] > 0 # Has a valid ID
- # BOM stats should show 1 item imported
- assert data["stats"]["bom_total_items"] == 1
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_export_project_with_linked_folder(self, async_client: AsyncClient, project_factory, db_session):
- """Verify project export includes linked folders."""
- from backend.app.models.library import LibraryFolder
- project = await project_factory(name="Project With Folder")
- # Create a linked folder
- folder = LibraryFolder(name="Project Files", project_id=project.id)
- db_session.add(folder)
- await db_session.commit()
- response = await async_client.get(f"/api/v1/projects/{project.id}/export?format=json")
- assert response.status_code == 200
- data = response.json()
- assert data["name"] == "Project With Folder"
- assert len(data["linked_folders"]) == 1
- assert data["linked_folders"][0]["name"] == "Project Files"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_project_with_linked_folder(self, async_client: AsyncClient):
- """Verify project import accepts linked folders data."""
- import_data = {
- "name": "Imported With Folders",
- "linked_folders": [
- {"name": "STL Files"},
- {"name": "Documentation"},
- ],
- }
- # Import should succeed with linked_folders
- response = await async_client.post("/api/v1/projects/import", json=import_data)
- assert response.status_code == 200
- data = response.json()
- assert data["name"] == "Imported With Folders"
- assert data["id"] > 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_project_from_json_file(self, async_client: AsyncClient):
- """Verify project can be imported from JSON file upload."""
- import io
- import json
- project_data = {
- "name": "File Uploaded Project",
- "description": "Imported from JSON file",
- "color": "#123456",
- }
- # Create a file-like object
- file_content = json.dumps(project_data).encode()
- files = {"file": ("project.json", io.BytesIO(file_content), "application/json")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 200
- data = response.json()
- assert data["name"] == "File Uploaded Project"
- assert data["description"] == "Imported from JSON file"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_project_from_zip_file(self, async_client: AsyncClient):
- """Verify project can be imported from ZIP file with files."""
- import io
- import json
- import zipfile
- project_data = {
- "name": "ZIP Imported Project",
- "description": "Imported from ZIP",
- "linked_folders": [{"name": "TestFolder", "files": [{"filename": "test.txt"}]}],
- }
- # Create a ZIP file in memory
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr("project.json", json.dumps(project_data))
- zf.writestr("files/TestFolder/test.txt", "Hello World")
- zip_buffer.seek(0)
- files = {"file": ("project.zip", zip_buffer, "application/zip")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 200
- data = response.json()
- assert data["name"] == "ZIP Imported Project"
- assert data["description"] == "Imported from ZIP"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_export_zip_contains_files(self, async_client: AsyncClient, project_factory, db_session):
- """Verify ZIP export contains actual files from linked folders."""
- import io
- import json
- import zipfile
- from pathlib import Path
- from backend.app.api.routes.library import get_library_dir
- from backend.app.models.library import LibraryFile, LibraryFolder
- project = await project_factory(name="Project With Files")
- # Create a linked folder with is_external fields
- folder = LibraryFolder(
- name="TestExportFolder",
- project_id=project.id,
- is_external=False,
- external_readonly=False,
- external_show_hidden=False,
- )
- db_session.add(folder)
- await db_session.flush()
- # Create a test file on disk
- library_dir = get_library_dir()
- folder_path = library_dir / "TestExportFolder"
- folder_path.mkdir(parents=True, exist_ok=True)
- test_file_path = folder_path / "test_export.txt"
- test_file_path.write_text("Export test content")
- # Create library file record
- lib_file = LibraryFile(
- folder_id=folder.id,
- filename="test_export.txt",
- file_path="TestExportFolder/test_export.txt",
- file_type="other",
- file_size=19,
- is_external=False,
- )
- db_session.add(lib_file)
- await db_session.commit()
- # Export as ZIP
- response = await async_client.get(f"/api/v1/projects/{project.id}/export")
- assert response.status_code == 200
- assert response.headers["content-type"] == "application/zip"
- # Verify ZIP contents
- zip_buffer = io.BytesIO(response.content)
- with zipfile.ZipFile(zip_buffer, "r") as zf:
- assert "project.json" in zf.namelist()
- assert "files/TestExportFolder/test_export.txt" in zf.namelist()
- # Verify file content
- file_content = zf.read("files/TestExportFolder/test_export.txt").decode()
- assert file_content == "Export test content"
- # Verify project.json
- project_data = json.loads(zf.read("project.json"))
- assert project_data["name"] == "Project With Files"
- # Cleanup
- test_file_path.unlink(missing_ok=True)
- folder_path.rmdir()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_invalid_file_type(self, async_client: AsyncClient):
- """Verify import rejects invalid file types."""
- import io
- files = {"file": ("project.txt", io.BytesIO(b"invalid"), "text/plain")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 400
- assert "must be .zip or .json" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_zip_missing_project_json(self, async_client: AsyncClient):
- """Verify import rejects ZIP without project.json."""
- import io
- import zipfile
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w") as zf:
- zf.writestr("other.txt", "no project.json here")
- zip_buffer.seek(0)
- files = {"file": ("project.zip", zip_buffer, "application/zip")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 400
- assert "project.json" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_invalid_json(self, async_client: AsyncClient):
- """Verify import rejects invalid JSON content."""
- import io
- files = {"file": ("project.json", io.BytesIO(b"not valid json"), "application/json")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 400
- assert "Invalid JSON" in response.json()["detail"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_rejects_absolute_path_in_folder_name(self, async_client: AsyncClient, tmp_path):
- """Absolute paths in `linked_folders[*].name` must not escape library_dir.
- Verbatim shape from the upstream advisory: attacker sets folder name to
- an absolute path, expecting Python's ``Path("/lib") / "/anywhere"`` to
- collapse to ``Path("/anywhere")`` and let the next file write land
- outside the library directory.
- """
- import io
- import json
- import zipfile
- target_outside = tmp_path / "outside" / "owned"
- # Build a ZIP whose folder name points outside library_dir entirely.
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr(
- "project.json",
- json.dumps(
- {
- "name": "innocent",
- "linked_folders": [{"name": str(target_outside)}],
- }
- ),
- )
- zf.writestr(f"files/{target_outside}/evil.pth", b"import os; os.system('echo pwned > /tmp/owned')\n")
- zip_buffer.seek(0)
- files = {"file": ("evil.zip", zip_buffer, "application/zip")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 400, response.text
- assert not target_outside.exists(), "Attacker payload landed outside library_dir"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_rejects_dotdot_in_folder_name(self, async_client: AsyncClient):
- """`..` segments in folder name must be rejected."""
- import io
- import json
- import zipfile
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr(
- "project.json",
- json.dumps(
- {
- "name": "innocent",
- "linked_folders": [{"name": "../../../etc"}],
- }
- ),
- )
- zf.writestr("files/../../../etc/x.txt", b"x")
- zip_buffer.seek(0)
- files = {"file": ("evil.zip", zip_buffer, "application/zip")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 400, response.text
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_rejects_dotdot_in_relative_path(self, async_client: AsyncClient):
- """`..` segments in the per-entry path (Vector B in the advisory) must
- be rejected even when the folder name itself is fine."""
- import io
- import json
- import zipfile
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr(
- "project.json",
- json.dumps(
- {
- "name": "innocent",
- "linked_folders": [{"name": "ok"}],
- }
- ),
- )
- # Folder name is benign, but the file path inside attempts to
- # escape via ``..``.
- zf.writestr("files/ok/../../../etc/x.txt", b"x")
- zip_buffer.seek(0)
- files = {"file": ("evil.zip", zip_buffer, "application/zip")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 400, response.text
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_import_legit_nested_zip_still_works(self, async_client: AsyncClient):
- """A legitimate ZIP with a nested file path inside the folder must
- continue to import cleanly. Guards against the fix being over-strict."""
- import io
- import json
- import zipfile
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- zf.writestr(
- "project.json",
- json.dumps(
- {
- "name": "nested-ok",
- "linked_folders": [{"name": "OkFolder"}],
- }
- ),
- )
- zf.writestr("files/OkFolder/sub/dir/inside.txt", b"hello")
- zip_buffer.seek(0)
- files = {"file": ("nested.zip", zip_buffer, "application/zip")}
- response = await async_client.post("/api/v1/projects/import/file", files=files)
- assert response.status_code == 200, response.text
- data = response.json()
- assert data["name"] == "nested-ok"
|