| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301 |
- """Integration tests for Archives API endpoints.
- Tests the full request/response cycle for /api/v1/archives/ endpoints.
- """
- from pathlib import Path
- import pytest
- from httpx import AsyncClient
- class TestArchivesAPI:
- """Integration tests for /api/v1/archives/ endpoints."""
- # ========================================================================
- # List endpoints
- # ========================================================================
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_archives_empty(self, async_client: AsyncClient):
- """Verify empty list is returned when no archives exist."""
- response = await async_client.get("/api/v1/archives/")
- assert response.status_code == 200
- data = response.json()
- assert isinstance(data, list)
- assert len(data) == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_archives_with_data(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify list returns existing archives."""
- printer = await printer_factory()
- await archive_factory(printer.id, print_name="Test Archive")
- response = await async_client.get("/api/v1/archives/")
- assert response.status_code == 200
- data = response.json()
- assert isinstance(data, list)
- assert len(data) >= 1
- assert any(a["print_name"] == "Test Archive" for a in data)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_archives_pagination(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify pagination works correctly."""
- printer = await printer_factory()
- # Create 5 archives
- for i in range(5):
- await archive_factory(printer.id, print_name=f"Archive {i}")
- # Get first page with limit 2
- response = await async_client.get("/api/v1/archives/?limit=2&offset=0")
- assert response.status_code == 200
- data = response.json()
- assert isinstance(data, list)
- assert len(data) == 2
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_archives_filter_by_printer(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify filtering by printer_id works."""
- printer1 = await printer_factory(name="Printer 1", serial_number="00M09A000000001")
- printer2 = await printer_factory(name="Printer 2", serial_number="00M09A000000002")
- await archive_factory(printer1.id, print_name="Printer 1 Archive")
- await archive_factory(printer2.id, print_name="Printer 2 Archive")
- response = await async_client.get(f"/api/v1/archives/?printer_id={printer1.id}")
- assert response.status_code == 200
- data = response.json()
- assert all(a["printer_id"] == printer1.id for a in data)
- # ========================================================================
- # Get single endpoint
- # ========================================================================
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_archive(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify single archive can be retrieved."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id, print_name="Get Test Archive")
- response = await async_client.get(f"/api/v1/archives/{archive.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["id"] == archive.id
- assert result["print_name"] == "Get Test Archive"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_archive_not_found(self, async_client: AsyncClient):
- """Verify 404 for non-existent archive."""
- response = await async_client.get("/api/v1/archives/9999")
- assert response.status_code == 404
- # ========================================================================
- # Update endpoints
- # ========================================================================
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_archive_name(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify archive name can be updated."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id, print_name="Original Name")
- response = await async_client.patch(f"/api/v1/archives/{archive.id}", json={"print_name": "Updated Name"})
- assert response.status_code == 200
- assert response.json()["print_name"] == "Updated Name"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_archive_notes(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify archive notes can be updated."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id)
- response = await async_client.patch(f"/api/v1/archives/{archive.id}", json={"notes": "Great print!"})
- assert response.status_code == 200
- assert response.json()["notes"] == "Great print!"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_archive_favorite(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify archive favorite status can be updated."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id)
- response = await async_client.patch(f"/api/v1/archives/{archive.id}", json={"is_favorite": True})
- assert response.status_code == 200
- assert response.json()["is_favorite"] is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_archive_external_url(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify archive external_url can be updated."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id)
- response = await async_client.patch(
- f"/api/v1/archives/{archive.id}", json={"external_url": "https://printables.com/model/12345"}
- )
- assert response.status_code == 200
- assert response.json()["external_url"] == "https://printables.com/model/12345"
- # Verify it can be cleared
- response = await async_client.patch(f"/api/v1/archives/{archive.id}", json={"external_url": None})
- assert response.status_code == 200
- assert response.json()["external_url"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_archive_failure_reason_mirrors_to_print_log_entry(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """#1444: PATCH /archives/{id} with failure_reason must mirror to the
- latest PrintLogEntry so the Stats page Failure Analysis widget
- (which reads PrintLogEntry.failure_reason) reflects the user's
- reclassification instead of showing "Unknown" forever.
- """
- from sqlalchemy import select
- from backend.app.models.print_log import PrintLogEntry
- printer = await printer_factory()
- # archive_factory auto-creates a matching PrintLogEntry (failure_reason
- # carried from the archive, which is NULL here — same shape as the bug
- # repro: print completed → log entry written with NULL → user goes to
- # classify the failure afterwards).
- archive = await archive_factory(printer.id, print_name="Failed Print", status="failed", run_status="failed")
- response = await async_client.patch(
- f"/api/v1/archives/{archive.id}",
- json={"failure_reason": "Adhesion failure"},
- )
- assert response.status_code == 200
- result = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
- mirrored = result.scalar_one()
- assert mirrored.failure_reason == "Adhesion failure"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_archive_status_mirrors_to_print_log_entry(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """#1444: PATCH /archives/{id} with status must mirror to the latest
- PrintLogEntry so stats that filter on PrintLogEntry.status see the
- user's reclassification.
- """
- from sqlalchemy import select
- from backend.app.models.print_log import PrintLogEntry
- printer = await printer_factory()
- archive = await archive_factory(printer.id, run_status="completed")
- response = await async_client.patch(
- f"/api/v1/archives/{archive.id}",
- json={"status": "failed"},
- )
- assert response.status_code == 200
- result = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
- mirrored = result.scalar_one()
- assert mirrored.status == "failed"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_archive_failure_reason_only_touches_latest_entry(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """#1444: For an archive with multiple runs (reprints), only the
- latest PrintLogEntry should receive the reclassification. Earlier
- runs were classified at their own time and must not be retroactively
- overwritten.
- """
- from backend.app.models.print_log import PrintLogEntry
- printer = await printer_factory()
- # First run — created by the factory's auto-run with its own reason.
- archive = await archive_factory(printer.id, status="failed", run_status="failed")
- from sqlalchemy import select
- first_run = (
- await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
- ).scalar_one()
- first_run.failure_reason = "Filament tangle"
- await db_session.commit()
- # Second run — the reprint that just finished with NULL classification.
- latest_run = PrintLogEntry(archive_id=archive.id, status="failed", failure_reason=None)
- db_session.add(latest_run)
- await db_session.commit()
- response = await async_client.patch(
- f"/api/v1/archives/{archive.id}",
- json={"failure_reason": "Adhesion failure"},
- )
- assert response.status_code == 200
- await db_session.refresh(first_run)
- await db_session.refresh(latest_run)
- assert first_run.failure_reason == "Filament tangle"
- assert latest_run.failure_reason == "Adhesion failure"
- # ========================================================================
- # Delete endpoints
- # ========================================================================
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_archive(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify archive can be deleted."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id)
- archive_id = archive.id
- response = await async_client.delete(f"/api/v1/archives/{archive_id}")
- assert response.status_code == 200
- # Verify deleted
- response = await async_client.get(f"/api/v1/archives/{archive_id}")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_nonexistent_archive(self, async_client: AsyncClient):
- """Verify deleting non-existent archive returns 404."""
- response = await async_client.delete("/api/v1/archives/9999")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_soft_delete_preserves_stats_contribution(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """#1343: deleting an archive without ``purge_stats`` keeps its
- contribution in Quick Stats. The row vanishes from listings but the
- filament / time / cost totals stay intact.
- """
- printer = await printer_factory()
- await archive_factory(
- printer.id,
- status="completed",
- print_time_seconds=3600,
- filament_used_grams=50.0,
- cost=1.50,
- )
- archive_to_delete = await archive_factory(
- printer.id,
- status="completed",
- print_time_seconds=7200,
- filament_used_grams=100.0,
- cost=3.00,
- )
- # Pre-delete: stats include both archives.
- pre = (await async_client.get("/api/v1/archives/stats")).json()
- assert pre["total_prints"] == 2
- assert pre["total_filament_grams"] == 150.0
- assert pre["total_cost"] == 4.50
- # Soft delete (default — no purge_stats param).
- resp = await async_client.delete(f"/api/v1/archives/{archive_to_delete.id}")
- assert resp.status_code == 200
- body = resp.json()
- assert body["purged_from_stats"] is False
- # Listing hides the deleted archive…
- listing = (await async_client.get("/api/v1/archives/")).json()
- assert all(a["id"] != archive_to_delete.id for a in listing)
- # …but stats still reflect both prints (the whole point of #1343).
- post = (await async_client.get("/api/v1/archives/stats")).json()
- assert post["total_prints"] == 2
- assert post["total_filament_grams"] == 150.0
- assert post["total_cost"] == 4.50
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_soft_delete_clears_thumbnail_path_on_linked_log_entries(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """#1348 follow-up: soft-deleting an archive removes its files from disk;
- the cached thumbnail_path on linked PrintLogEntry rows must be NULLed
- in the same transaction so the print-log view doesn't 404-storm on the
- now-deleted thumbnail file."""
- from sqlalchemy import select
- from backend.app.models.print_log import PrintLogEntry
- printer = await printer_factory()
- archive = await archive_factory(
- printer.id,
- status="completed",
- thumbnail_path="archives/test/test_print/thumbnail.png",
- )
- # The factory's auto-PrintLogEntry doesn't copy thumbnail_path; set it
- # manually to mirror what the production write_log_entry path stores.
- run_query = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
- run = run_query.scalar_one()
- run.thumbnail_path = "archives/test/test_print/thumbnail.png"
- await db_session.commit()
- assert run.thumbnail_path is not None
- resp = await async_client.delete(f"/api/v1/archives/{archive.id}")
- assert resp.status_code == 200
- assert resp.json()["purged_from_stats"] is False
- await db_session.refresh(run)
- assert run.thumbnail_path is None, "soft-delete must NULL thumbnail_path on linked log entry"
- # The log entry itself survives the soft delete (its filament/cost
- # contribution still needs to flow into stats per #1343).
- assert run.id is not None
- assert run.archive_id == archive.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_hard_delete_clears_thumbnail_path_before_fk_cascade(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """#1348 follow-up: the auto-purge sweeper (and any caller of
- ArchiveService.delete_archive) hard-deletes the archive row but leaves
- PrintLogEntry rows alive via ON DELETE SET NULL. The eager
- thumbnail_path clear must run inside delete_archive so even orphaned
- log entries don't surface stale paths."""
- from sqlalchemy import select
- from backend.app.models.print_log import PrintLogEntry
- from backend.app.services.archive import ArchiveService
- printer = await printer_factory()
- archive = await archive_factory(
- printer.id,
- status="completed",
- thumbnail_path="archives/test/test_print/thumbnail.png",
- )
- run_query = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
- run = run_query.scalar_one()
- run.thumbnail_path = "archives/test/test_print/thumbnail.png"
- await db_session.commit()
- run_id = run.id
- service = ArchiveService(db_session)
- assert await service.delete_archive(archive.id) is True
- # Log entry survives the hard-delete (the FK is ON DELETE SET NULL
- # in production; SQLite test config doesn't enable foreign_keys=ON
- # by default so archive_id may still be set, but the row itself
- # remains for audit). The thumbnail_path was cleared eagerly by
- # _null_print_log_thumbnail_paths before db.delete(archive).
- refetch = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.id == run_id))
- survivor = refetch.scalar_one()
- assert survivor.thumbnail_path is None, (
- "delete_archive must NULL thumbnail_path before removing the archive row"
- )
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_print_log_thumbnail_route_lazy_nulls_missing_file(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """#1348 follow-up: GET /print-log/{id}/thumbnail self-heals when the
- thumbnail_path on a log entry points at a missing file (failed print
- whose thumbnail was never written, or a stale path that escaped the
- delete-time cleanup)."""
- from sqlalchemy import select
- from backend.app.models.print_log import PrintLogEntry
- printer = await printer_factory()
- archive = await archive_factory(printer.id, status="failed")
- run_query = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
- run = run_query.scalar_one()
- # Path points at a file that never existed (failed-print case where
- # archive.thumbnail_path was set but the extractor never produced one).
- run.thumbnail_path = "archives/missing/never_written/thumbnail.png"
- await db_session.commit()
- # Auth is disabled in the integration test config, so the stream-token
- # guard is bypassed — the route runs the lazy-NULL branch directly.
- resp = await async_client.get(f"/api/v1/print-log/{run.id}/thumbnail")
- assert resp.status_code == 404
- await db_session.refresh(run)
- assert run.thumbnail_path is None, "missing file must self-heal to NULL"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_purge_stats_drops_archive_from_quick_stats(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """#1343: deleting with ``?purge_stats=true`` hard-deletes the row,
- dropping its contribution from Quick Stats (the original behaviour,
- now opt-in)."""
- printer = await printer_factory()
- keep = await archive_factory(printer.id, status="completed", filament_used_grams=50.0)
- purge = await archive_factory(printer.id, status="completed", filament_used_grams=100.0)
- resp = await async_client.delete(f"/api/v1/archives/{purge.id}?purge_stats=true")
- assert resp.status_code == 200
- assert resp.json()["purged_from_stats"] is True
- stats = (await async_client.get("/api/v1/archives/stats")).json()
- assert stats["total_prints"] == 1
- assert stats["total_filament_grams"] == 50.0
- # The kept archive is still listed.
- listing = (await async_client.get("/api/v1/archives/")).json()
- assert [a["id"] for a in listing] == [keep.id]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_soft_deleted_archive_404_on_detail(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """A soft-deleted archive must 404 on GET — a stale bookmark or
- direct URL should not expose a row the user has already removed."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id)
- await async_client.delete(f"/api/v1/archives/{archive.id}")
- resp = await async_client.get(f"/api/v1/archives/{archive.id}")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_soft_deleted_archive_hidden_from_search(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Search must skip soft-deleted archives. Uses the LIKE fallback by
- querying a single-character pattern that the SQLite FTS5 rejects, so
- the test covers the fallback path that the production FTS path also
- respects."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id, print_name="UniqueSoftDeleteCandidate")
- await async_client.delete(f"/api/v1/archives/{archive.id}")
- resp = await async_client.get("/api/v1/archives/search?q=UniqueSoftDeleteCandidate")
- assert resp.status_code == 200
- assert resp.json() == []
- # ========================================================================
- # Statistics endpoints
- # ========================================================================
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_archive_stats(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify archive statistics can be retrieved."""
- printer = await printer_factory()
- await archive_factory(
- printer.id,
- status="completed",
- print_time_seconds=3600,
- filament_used_grams=50.0,
- )
- await archive_factory(
- printer.id,
- status="completed",
- print_time_seconds=7200,
- filament_used_grams=100.0,
- )
- response = await async_client.get("/api/v1/archives/stats")
- assert response.status_code == 200
- result = response.json()
- # Check for actual stats fields
- assert "total_prints" in result
- assert "successful_prints" in result
- class TestArchivesSlimAPI:
- """Integration tests for /api/v1/archives/slim endpoint."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_slim_empty(self, async_client: AsyncClient):
- """Verify empty list when no archives exist."""
- response = await async_client.get("/api/v1/archives/slim")
- assert response.status_code == 200
- assert response.json() == []
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_slim_returns_only_expected_fields(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify response contains only slim fields, not full archive data."""
- printer = await printer_factory()
- await archive_factory(
- printer.id,
- print_name="Slim Test",
- status="completed",
- filament_type="PLA",
- filament_color="#FF0000",
- filament_used_grams=50.0,
- print_time_seconds=3600,
- cost=1.50,
- quantity=2,
- )
- response = await async_client.get("/api/v1/archives/slim")
- assert response.status_code == 200
- data = response.json()
- assert len(data) == 1
- item = data[0]
- # Expected fields present
- assert item["printer_id"] == printer.id
- assert item["print_name"] == "Slim Test"
- assert item["status"] == "completed"
- assert item["filament_type"] == "PLA"
- assert item["filament_color"] == "#FF0000"
- assert item["filament_used_grams"] == 50.0
- assert item["print_time_seconds"] == 3600
- assert item["cost"] == 1.50
- # quantity is per-event semantics now (each PrintLogEntry = one run);
- # the archive's quantity field is no longer surfaced through this
- # endpoint after the #1390 per-event migration.
- assert item["quantity"] == 1
- assert "created_at" in item
- # Full archive fields must NOT be present
- assert "id" not in item
- assert "filename" not in item
- assert "file_path" not in item
- assert "file_size" not in item
- assert "extra_data" not in item
- assert "notes" not in item
- assert "tags" not in item
- assert "photos" not in item
- assert "thumbnail_path" not in item
- assert "content_hash" not in item
- assert "duplicates" not in item
- assert "duplicate_count" not in item
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_slim_computes_actual_time(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify actual_time_seconds is computed from started_at/completed_at."""
- from datetime import datetime, timezone
- printer = await printer_factory()
- started = datetime(2024, 1, 1, 10, 0, 0, tzinfo=timezone.utc)
- completed = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) # 2 hours = 7200s
- await archive_factory(
- printer.id,
- status="completed",
- started_at=started,
- completed_at=completed,
- )
- response = await async_client.get("/api/v1/archives/slim")
- assert response.status_code == 200
- item = response.json()[0]
- assert item["actual_time_seconds"] == 7200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_slim_actual_time_for_failed_includes_elapsed(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Failed prints report measured elapsed time so Printer Stats By Time
- matches Quick Stats Print Time (#1390). Previously this returned null
- and the frontend fell back to the slicer estimate, double-counting the
- unfinished portion of the print."""
- from datetime import datetime, timezone
- printer = await printer_factory()
- await archive_factory(
- printer.id,
- status="failed",
- started_at=datetime(2024, 1, 1, 10, 0, 0, tzinfo=timezone.utc),
- completed_at=datetime(2024, 1, 1, 11, 0, 0, tzinfo=timezone.utc),
- )
- response = await async_client.get("/api/v1/archives/slim")
- assert response.status_code == 200
- item = response.json()[0]
- assert item["actual_time_seconds"] == 3600
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_slim_date_filtering(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify date_from and date_to filters work."""
- from datetime import datetime, timezone
- printer = await printer_factory()
- await archive_factory(
- printer.id,
- print_name="Old Print",
- created_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
- )
- await archive_factory(
- printer.id,
- print_name="New Print",
- created_at=datetime(2024, 6, 15, tzinfo=timezone.utc),
- )
- # Filter to only June 2024
- response = await async_client.get("/api/v1/archives/slim?date_from=2024-06-01&date_to=2024-06-30")
- assert response.status_code == 200
- data = response.json()
- assert len(data) == 1
- assert data[0]["print_name"] == "New Print"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_slim_pagination(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify limit and offset work."""
- printer = await printer_factory()
- for i in range(5):
- await archive_factory(printer.id, print_name=f"Print {i}")
- response = await async_client.get("/api/v1/archives/slim?limit=2&offset=0")
- assert response.status_code == 200
- assert len(response.json()) == 2
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_slim_counts_reprints_as_separate_rows(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Reprints add events even though the archive row is overwritten (#1390).
- Before the per-event migration, /archives/slim returned one row per
- archive — so an archive that had been reprinted three times appeared
- once and undercounted Filament Used / Cost / Time. The endpoint must
- now return one row per logged event.
- """
- from backend.app.models.print_log import PrintLogEntry
- printer = await printer_factory()
- archive = await archive_factory(
- printer.id,
- print_name="Reprinted Model",
- filament_used_grams=50.0,
- cost=1.50,
- )
- # archive_factory synthesizes one event; add two more to simulate
- # the same archive being reprinted twice more.
- for _ in range(2):
- db_session.add(
- PrintLogEntry(
- archive_id=archive.id,
- printer_id=archive.printer_id,
- status="completed",
- filament_type=archive.filament_type,
- filament_used_grams=archive.filament_used_grams,
- cost=archive.cost,
- print_name=archive.print_name,
- )
- )
- await db_session.commit()
- response = await async_client.get("/api/v1/archives/slim")
- assert response.status_code == 200
- data = response.json()
- assert len(data) == 3, "Each reprint must contribute one row"
- total_filament = sum(item["filament_used_grams"] or 0 for item in data)
- assert total_filament == 150.0, "Sum across events must reflect all three runs"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_slim_includes_orphan_events(self, async_client: AsyncClient, printer_factory, db_session):
- """Events whose archive was hard-deleted still appear (#1390).
- After ON DELETE SET NULL the event row survives with archive_id=NULL.
- The slim endpoint must keep counting it so Quick Stats and the
- archive-iterating widgets stay aligned.
- """
- from backend.app.models.print_log import PrintLogEntry
- printer = await printer_factory()
- db_session.add(
- PrintLogEntry(
- archive_id=None,
- printer_id=printer.id,
- status="completed",
- filament_type="PETG",
- filament_used_grams=25.0,
- cost=0.75,
- print_name="Orphaned Print",
- )
- )
- await db_session.commit()
- response = await async_client.get("/api/v1/archives/slim")
- assert response.status_code == 200
- data = response.json()
- assert len(data) == 1
- assert data[0]["print_name"] == "Orphaned Print"
- assert data[0]["filament_used_grams"] == 25.0
- # print_time_seconds (sliced estimate) comes from the archive table,
- # which orphans no longer have — must surface as null gracefully.
- assert data[0]["print_time_seconds"] is None
- class TestFailureAnalysisAPI:
- """Per-event failure analysis (#1390)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_failure_analysis_counts_reprints_and_orphans(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Failure analysis aggregates per event, not per archive.
- Verifies the dual fix for #1390: a reprint that adds a second failed
- event must count twice, and an orphan failed event (archive deleted)
- must still appear in the totals.
- """
- from backend.app.models.print_log import PrintLogEntry
- printer = await printer_factory()
- archive = await archive_factory(
- printer.id,
- print_name="Failing Model",
- status="failed",
- failure_reason="filament_runout",
- )
- # Add a second failed event for the same archive (a reprint that also
- # failed) and one orphan failed event (archive was deleted).
- db_session.add(
- PrintLogEntry(
- archive_id=archive.id,
- printer_id=printer.id,
- status="failed",
- failure_reason="filament_runout",
- filament_type=archive.filament_type,
- print_name=archive.print_name,
- )
- )
- db_session.add(
- PrintLogEntry(
- archive_id=None,
- printer_id=printer.id,
- status="failed",
- failure_reason="bed_adhesion",
- filament_type="PETG",
- print_name="Orphaned Failed Print",
- )
- )
- await db_session.commit()
- response = await async_client.get("/api/v1/archives/analysis/failures")
- assert response.status_code == 200
- result = response.json()
- assert result["total_prints"] == 3
- assert result["failed_prints"] == 3
- assert result["failures_by_reason"]["filament_runout"] == 2
- assert result["failures_by_reason"]["bed_adhesion"] == 1
- class TestArchiveDataIntegrity:
- """Tests for archive data integrity."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_linked_to_printer(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify archive is properly linked to printer."""
- printer = await printer_factory(name="My Printer")
- archive = await archive_factory(printer.id)
- response = await async_client.get(f"/api/v1/archives/{archive.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["printer_id"] == printer.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_stores_print_data(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify archive stores all print data correctly."""
- printer = await printer_factory()
- archive = await archive_factory(
- printer.id,
- print_name="Test Print",
- filename="test.3mf",
- status="completed",
- filament_type="PLA",
- filament_used_grams=75.5,
- print_time_seconds=5400,
- )
- response = await async_client.get(f"/api/v1/archives/{archive.id}")
- assert response.status_code == 200
- result = response.json()
- assert result["print_name"] == "Test Print"
- assert result["filename"] == "test.3mf"
- assert result["status"] == "completed"
- assert result["filament_type"] == "PLA"
- assert result["filament_used_grams"] == 75.5
- assert result["print_time_seconds"] == 5400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_update_persists(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """CRITICAL: Verify archive updates persist."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id, notes="Original notes")
- # Update
- await async_client.patch(f"/api/v1/archives/{archive.id}", json={"notes": "Updated notes", "is_favorite": True})
- # Verify persistence
- response = await async_client.get(f"/api/v1/archives/{archive.id}")
- result = response.json()
- assert result["notes"] == "Updated notes"
- assert result["is_favorite"] is True
- class TestArchiveF3DEndpoints:
- """Tests for F3D (Fusion 360 design file) attachment endpoints."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_response_includes_f3d_path(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify f3d_path is included in archive response."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id, f3d_path="archives/test/design.f3d")
- response = await async_client.get(f"/api/v1/archives/{archive.id}")
- assert response.status_code == 200
- result = response.json()
- assert "f3d_path" in result
- assert result["f3d_path"] == "archives/test/design.f3d"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_archive_response_f3d_path_null_when_not_set(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify f3d_path is null when no F3D file attached."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id)
- response = await async_client.get(f"/api/v1/archives/{archive.id}")
- assert response.status_code == 200
- result = response.json()
- assert "f3d_path" in result
- assert result["f3d_path"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_upload_f3d_to_nonexistent_archive(self, async_client: AsyncClient):
- """Verify 404 when uploading F3D to non-existent archive."""
- # Create a minimal file-like upload
- files = {"file": ("design.f3d", b"fake f3d content", "application/octet-stream")}
- response = await async_client.post("/api/v1/archives/9999/f3d", files=files)
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_download_f3d_not_found_when_no_file(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify 404 when downloading F3D from archive without F3D file."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id)
- response = await async_client.get(f"/api/v1/archives/{archive.id}/f3d")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_download_f3d_nonexistent_archive(self, async_client: AsyncClient):
- """Verify 404 when downloading F3D from non-existent archive."""
- response = await async_client.get("/api/v1/archives/9999/f3d")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_f3d_nonexistent_archive(self, async_client: AsyncClient):
- """Verify 404 when deleting F3D from non-existent archive."""
- response = await async_client.delete("/api/v1/archives/9999/f3d")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_f3d_when_no_file(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify 404 when deleting F3D from archive without F3D file."""
- printer = await printer_factory()
- archive = await archive_factory(printer.id)
- response = await async_client.delete(f"/api/v1/archives/{archive.id}/f3d")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_archives_includes_f3d_path(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify f3d_path is included in archive list responses."""
- printer = await printer_factory()
- await archive_factory(printer.id, print_name="With F3D", f3d_path="archives/test/design.f3d")
- await archive_factory(printer.id, print_name="Without F3D")
- response = await async_client.get("/api/v1/archives/")
- assert response.status_code == 200
- data = response.json()
- assert len(data) >= 2
- with_f3d = next((a for a in data if a["print_name"] == "With F3D"), None)
- without_f3d = next((a for a in data if a["print_name"] == "Without F3D"), None)
- assert with_f3d is not None
- assert with_f3d["f3d_path"] == "archives/test/design.f3d"
- assert without_f3d is not None
- assert without_f3d["f3d_path"] is None
- # ========================================================================
- # Multi-Plate 3MF endpoints (Issue #93)
- # ========================================================================
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_archive_plates_not_found(self, async_client: AsyncClient):
- """Verify 404 when fetching plates for non-existent archive."""
- response = await async_client.get("/api/v1/archives/999999/plates")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_plate_thumbnail_not_found(self, async_client: AsyncClient):
- """Verify 404 when fetching plate thumbnail for non-existent archive."""
- response = await async_client.get("/api/v1/archives/999999/plate-thumbnail/1")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_filament_requirements_not_found(self, async_client: AsyncClient):
- """Verify filament-requirements returns 404 for non-existent archive."""
- response = await async_client.get("/api/v1/archives/999999/filament-requirements")
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_filament_requirements_with_plate_id_not_found(self, async_client: AsyncClient):
- """Verify filament-requirements with plate_id returns 404 for non-existent archive."""
- response = await async_client.get("/api/v1/archives/999999/filament-requirements?plate_id=1")
- assert response.status_code == 404
- # ========================================================================
- # Tag Management endpoints (Issue #183)
- # ========================================================================
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_tags_empty(self, async_client: AsyncClient):
- """Verify empty list when no tags exist."""
- response = await async_client.get("/api/v1/archives/tags")
- assert response.status_code == 200
- data = response.json()
- assert isinstance(data, list)
- assert len(data) == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_tags_with_data(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify tags are returned with counts."""
- printer = await printer_factory()
- await archive_factory(printer.id, print_name="Archive 1", tags="functional, test")
- await archive_factory(printer.id, print_name="Archive 2", tags="functional, calibration")
- await archive_factory(printer.id, print_name="Archive 3", tags="test")
- response = await async_client.get("/api/v1/archives/tags")
- assert response.status_code == 200
- data = response.json()
- assert isinstance(data, list)
- # Convert to dict for easier lookup
- tags_dict = {t["name"]: t["count"] for t in data}
- assert tags_dict.get("functional") == 2
- assert tags_dict.get("test") == 2
- assert tags_dict.get("calibration") == 1
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_get_tags_sorted_by_count(
- self, async_client: AsyncClient, archive_factory, printer_factory, db_session
- ):
- """Verify tags are sorted by count descending, then by name."""
- printer = await printer_factory()
- await archive_factory(printer.id, tags="alpha")
- await archive_factory(printer.id, tags="beta, alpha")
- await archive_factory(printer.id, tags="gamma, beta, alpha")
- response = await async_client.get("/api/v1/archives/tags")
- assert response.status_code == 200
- data = response.json()
- # alpha=3, beta=2, gamma=1
- assert data[0]["name"] == "alpha"
- assert data[0]["count"] == 3
- assert data[1]["name"] == "beta"
- assert data[1]["count"] == 2
- assert data[2]["name"] == "gamma"
- assert data[2]["count"] == 1
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_tag(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify renaming a tag updates all archives."""
- printer = await printer_factory()
- a1 = await archive_factory(printer.id, print_name="Archive 1", tags="old-tag, other")
- a2 = await archive_factory(printer.id, print_name="Archive 2", tags="old-tag")
- await archive_factory(printer.id, print_name="Archive 3", tags="different")
- response = await async_client.put("/api/v1/archives/tags/old-tag", json={"new_name": "new-tag"})
- assert response.status_code == 200
- data = response.json()
- assert data["affected"] == 2
- # Verify the archives were updated
- response = await async_client.get(f"/api/v1/archives/{a1.id}")
- assert "new-tag" in response.json()["tags"]
- assert "old-tag" not in response.json()["tags"]
- response = await async_client.get(f"/api/v1/archives/{a2.id}")
- assert response.json()["tags"] == "new-tag"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_tag_no_change(self, async_client: AsyncClient):
- """Verify renaming to same name returns 0 affected."""
- response = await async_client.put("/api/v1/archives/tags/some-tag", json={"new_name": "some-tag"})
- assert response.status_code == 200
- assert response.json()["affected"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rename_tag_empty_name_error(self, async_client: AsyncClient):
- """Verify renaming to empty name returns error."""
- response = await async_client.put("/api/v1/archives/tags/some-tag", json={"new_name": ""})
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_tag(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
- """Verify deleting a tag removes it from all archives."""
- printer = await printer_factory()
- a1 = await archive_factory(printer.id, print_name="Archive 1", tags="delete-me, keep")
- a2 = await archive_factory(printer.id, print_name="Archive 2", tags="delete-me")
- await archive_factory(printer.id, print_name="Archive 3", tags="different")
- response = await async_client.delete("/api/v1/archives/tags/delete-me")
- assert response.status_code == 200
- data = response.json()
- assert data["affected"] == 2
- # Verify the archives were updated
- response = await async_client.get(f"/api/v1/archives/{a1.id}")
- assert response.json()["tags"] == "keep"
- response = await async_client.get(f"/api/v1/archives/{a2.id}")
- # Should be None or empty when last tag is removed
- assert response.json()["tags"] is None or response.json()["tags"] == ""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_tag_not_found(self, async_client: AsyncClient):
- """Verify deleting non-existent tag returns 0 affected."""
- response = await async_client.delete("/api/v1/archives/tags/nonexistent-tag")
- assert response.status_code == 200
- assert response.json()["affected"] == 0
- class TestUploadSourceThreeMF:
- """Regression for #1531: source-3MF upload on fallback archives."""
- @staticmethod
- def _minimal_3mf_bytes() -> bytes:
- """Smallest valid .3mf — the upload path enforces a zip header check."""
- import io
- import zipfile
- buf = io.BytesIO()
- with zipfile.ZipFile(buf, "w") as zf:
- zf.writestr("[Content_Types].xml", "<types/>")
- return buf.getvalue()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fallback_archive_source_upload_lands_under_base_dir(
- self, async_client: AsyncClient, archive_factory, printer_factory, monkeypatch, tmp_path
- ):
- """Fallback archive (file_path='') must accept a source upload and store it inside base_dir.
- Pre-fix, ``Path(base_dir) / ''`` collapsed to ``base_dir`` and the
- ``.parent`` walked out of the data volume, sending the file to
- ``/app/source/...`` and crashing on ``relative_to``.
- """
- from backend.app.core.config import settings as app_settings
- monkeypatch.setattr(app_settings, "base_dir", tmp_path)
- printer = await printer_factory()
- archive = await archive_factory(
- printer.id,
- print_name="Cloud Print",
- file_path="", # fallback archive — no source 3MF was archived
- filename="Cloud Print.3mf",
- )
- files = {"file": ("cloud_print.3mf", self._minimal_3mf_bytes(), "application/octet-stream")}
- response = await async_client.post(f"/api/v1/archives/{archive.id}/source", files=files)
- assert response.status_code == 200, response.text
- payload = response.json()
- rel = payload["source_3mf_path"]
- # Stored as a relative path inside base_dir.
- assert not rel.startswith("/"), f"source_3mf_path should be relative, got {rel!r}"
- # File physically landed under base_dir (NOT escaped to /app/source/).
- assert (tmp_path / rel).is_file()
- # Deterministic fallback location keyed off archive id.
- assert rel == f"archive/no_source/{archive.id}/cloud_print.3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_normal_archive_source_upload_unchanged(
- self, async_client: AsyncClient, archive_factory, printer_factory, monkeypatch, tmp_path
- ):
- """Normal archive (file_path set) still nests the source under <archive>/source/."""
- from backend.app.core.config import settings as app_settings
- monkeypatch.setattr(app_settings, "base_dir", tmp_path)
- printer = await printer_factory()
- # archive_factory's default file_path is "archives/test/test_print.gcode.3mf".
- archive = await archive_factory(printer.id, print_name="Real Print")
- files = {"file": ("real_print.3mf", self._minimal_3mf_bytes(), "application/octet-stream")}
- response = await async_client.post(f"/api/v1/archives/{archive.id}/source", files=files)
- assert response.status_code == 200, response.text
- rel = response.json()["source_3mf_path"]
- assert rel == "archives/test/source/real_print.3mf"
- assert (tmp_path / rel).is_file()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_symlinked_data_dir_upload_succeeds(
- self, async_client: AsyncClient, archive_factory, printer_factory, monkeypatch, tmp_path
- ):
- """Regression: DATA_DIR that's a symlink to the real storage must not break the upload.
- Common on TrueNAS / Synology / QNAP storage pools, and any
- ``-v /symlinked/host/path:/app/data`` mount. The helper resolves
- only for the containment check and returns literal paths so the
- caller's ``relative_to(settings.base_dir)`` doesn't trip over a
- canonical-vs-symlink mismatch.
- """
- from backend.app.core.config import settings as app_settings
- real_dir = tmp_path / "real_storage"
- real_dir.mkdir()
- symlink_dir = tmp_path / "data_via_symlink"
- symlink_dir.symlink_to(real_dir)
- monkeypatch.setattr(app_settings, "base_dir", symlink_dir)
- printer = await printer_factory()
- archive = await archive_factory(
- printer.id,
- print_name="Symlinked Print",
- file_path="archives/X1C/print.gcode.3mf",
- filename="print.gcode.3mf",
- )
- files = {"file": ("print.3mf", self._minimal_3mf_bytes(), "application/octet-stream")}
- response = await async_client.post(f"/api/v1/archives/{archive.id}/source", files=files)
- assert response.status_code == 200, response.text
- rel = response.json()["source_3mf_path"]
- assert rel == "archives/X1C/source/print.3mf"
- # Reachable via both the symlink and the canonical path.
- assert (symlink_dir / rel).is_file()
- assert (real_dir / rel).is_file()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_absolute_file_path_rejected_with_clear_500(
- self, async_client: AsyncClient, archive_factory, printer_factory, monkeypatch, tmp_path
- ):
- """A row whose file_path is absolute (corrupted by old import / manual edit)
- must fail with the explicit "outside the data directory" message, not silently
- write outside base_dir."""
- from backend.app.core.config import settings as app_settings
- monkeypatch.setattr(app_settings, "base_dir", tmp_path)
- printer = await printer_factory()
- archive = await archive_factory(
- printer.id,
- print_name="Corrupt Path",
- file_path="/tmp/totally_outside.gcode.3mf",
- filename="totally_outside.gcode.3mf",
- )
- files = {"file": ("totally_outside.3mf", self._minimal_3mf_bytes(), "application/octet-stream")}
- response = await async_client.post(f"/api/v1/archives/{archive.id}/source", files=files)
- assert response.status_code == 500
- assert "outside the data directory" in response.json()["detail"]
- # Did not write anything under the bogus /tmp/source/ either.
- assert not (Path("/tmp") / "source").exists() or not (Path("/tmp") / "source" / "totally_outside.3mf").exists()
|