| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- """Integration tests for the library trash bin + admin purge (#1008)."""
- from datetime import datetime, timedelta, timezone
- import pytest
- from httpx import AsyncClient
- @pytest.fixture
- async def file_factory(db_session):
- """Factory for LibraryFile rows with sensible defaults."""
- _counter = [0]
- async def _create_file(**kwargs):
- from backend.app.models.library import LibraryFile
- _counter[0] += 1
- counter = _counter[0]
- defaults = {
- "filename": f"trash_test_{counter}.3mf",
- "file_path": f"/tmp/trash_test_{counter}.3mf",
- "file_size": 1024 * counter,
- "file_type": "3mf",
- }
- defaults.update(kwargs)
- lib_file = LibraryFile(**defaults)
- db_session.add(lib_file)
- await db_session.commit()
- await db_session.refresh(lib_file)
- return lib_file
- return _create_file
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_file_moves_to_trash(async_client: AsyncClient, file_factory, db_session):
- """DELETE /library/files/{id} soft-deletes (managed) files into trash."""
- from backend.app.models.library import LibraryFile
- f = await file_factory()
- response = await async_client.delete(f"/api/v1/library/files/{f.id}")
- assert response.status_code == 200
- body = response.json()
- assert body["trashed"] is True
- # Row still exists with deleted_at stamped
- await db_session.refresh(f)
- assert f.deleted_at is not None
- # Normal listing hides it
- list_resp = await async_client.get("/api/v1/library/files")
- assert list_resp.status_code == 200
- ids = [row["id"] for row in list_resp.json()]
- assert f.id not in ids
- # Trash listing surfaces it
- trash_resp = await async_client.get("/api/v1/library/trash")
- assert trash_resp.status_code == 200
- payload = trash_resp.json()
- trashed_ids = [item["id"] for item in payload["items"]]
- assert f.id in trashed_ids
- assert payload["total"] >= 1
- assert payload["retention_days"] >= 1
- # Row's file_type is preserved in the original table (sanity check on the filter)
- row = await db_session.get(LibraryFile, f.id)
- assert row is not None
- assert row.file_type == "3mf"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_external_file_hard_deletes(async_client: AsyncClient, file_factory, db_session):
- """External files skip the trash — DB row is dropped directly."""
- from backend.app.models.library import LibraryFile
- f = await file_factory(is_external=True)
- file_id = f.id
- response = await async_client.delete(f"/api/v1/library/files/{file_id}")
- assert response.status_code == 200
- assert response.json()["trashed"] is False
- # The route commits in its own session; expire ours so get() re-reads.
- db_session.expire_all()
- missing = await db_session.get(LibraryFile, file_id)
- assert missing is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_restore_from_trash(async_client: AsyncClient, file_factory, db_session):
- """Restoring a trashed file clears deleted_at and makes it visible again."""
- f = await file_factory()
- await async_client.delete(f"/api/v1/library/files/{f.id}")
- resp = await async_client.post(f"/api/v1/library/trash/{f.id}/restore")
- assert resp.status_code == 200
- await db_session.refresh(f)
- assert f.deleted_at is None
- list_resp = await async_client.get("/api/v1/library/files")
- ids = [row["id"] for row in list_resp.json()]
- assert f.id in ids
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_hard_delete_from_trash(async_client: AsyncClient, file_factory, db_session):
- """Hard-delete from trash removes the DB row immediately."""
- from backend.app.models.library import LibraryFile
- f = await file_factory()
- file_id = f.id
- await async_client.delete(f"/api/v1/library/files/{file_id}")
- resp = await async_client.delete(f"/api/v1/library/trash/{file_id}")
- assert resp.status_code == 200
- db_session.expire_all()
- row = await db_session.get(LibraryFile, file_id)
- assert row is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_empty_trash(async_client: AsyncClient, file_factory, db_session):
- """Empty-trash hard-deletes every trashed row in the caller's scope."""
- from sqlalchemy import func, select
- from backend.app.models.library import LibraryFile
- for _ in range(3):
- f = await file_factory()
- await async_client.delete(f"/api/v1/library/files/{f.id}")
- resp = await async_client.delete("/api/v1/library/trash")
- assert resp.status_code == 200
- assert resp.json()["deleted"] >= 3
- count = await db_session.execute(select(func.count(LibraryFile.id)).where(LibraryFile.deleted_at.isnot(None)))
- assert (count.scalar() or 0) == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_hard_delete_rejects_active_file(async_client: AsyncClient, file_factory):
- """Trash endpoints 404 for files that aren't actually trashed."""
- f = await file_factory()
- resp = await async_client.delete(f"/api/v1/library/trash/{f.id}")
- assert resp.status_code == 404
- resp = await async_client.post(f"/api/v1/library/trash/{f.id}/restore")
- assert resp.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_purge_preview_counts_old_files(async_client: AsyncClient, file_factory, db_session):
- """Preview counts only files past the threshold and returns total size + samples."""
- old_cutoff = datetime.now(timezone.utc) - timedelta(days=120)
- old1 = await file_factory(file_size=5000)
- old2 = await file_factory(file_size=7000)
- # A young file whose created_at stays near "now" — must not be counted.
- await file_factory(file_size=3000)
- # Stamp created_at into the past so the never-printed branch matches.
- for row, ts in ((old1, old_cutoff), (old2, old_cutoff)):
- row.created_at = ts
- await db_session.commit()
- resp = await async_client.get(
- "/api/v1/library/purge/preview",
- params={"older_than_days": 90, "include_never_printed": True},
- )
- assert resp.status_code == 200
- body = resp.json()
- assert body["count"] == 2
- assert body["total_bytes"] == 12000
- assert body["older_than_days"] == 90
- assert len(body["sample_filenames"]) == 2
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_purge_excludes_never_printed_when_requested(async_client: AsyncClient, file_factory, db_session):
- """With include_never_printed=False, only files with last_printed_at are eligible."""
- long_ago = datetime.now(timezone.utc) - timedelta(days=200)
- recently_printed = await file_factory()
- recently_printed.last_printed_at = long_ago
- never_printed = await file_factory()
- never_printed.created_at = long_ago
- await db_session.commit()
- # Exclude never-printed → only 1 match
- resp = await async_client.get(
- "/api/v1/library/purge/preview",
- params={"older_than_days": 90, "include_never_printed": False},
- )
- assert resp.json()["count"] == 1
- # Include → 2 matches
- resp = await async_client.get(
- "/api/v1/library/purge/preview",
- params={"older_than_days": 90, "include_never_printed": True},
- )
- assert resp.json()["count"] == 2
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_purge_execute_moves_to_trash(async_client: AsyncClient, file_factory, db_session):
- """POST /library/purge moves matching files into trash (deleted_at stamped)."""
- long_ago = datetime.now(timezone.utc) - timedelta(days=200)
- f = await file_factory()
- f.created_at = long_ago
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/library/purge",
- json={"older_than_days": 90, "include_never_printed": True},
- )
- assert resp.status_code == 200
- assert resp.json()["moved_to_trash"] >= 1
- await db_session.refresh(f)
- assert f.deleted_at is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_purge_skips_external_files(async_client: AsyncClient, file_factory, db_session):
- """External files are never eligible for purge, regardless of age."""
- long_ago = datetime.now(timezone.utc) - timedelta(days=300)
- ext = await file_factory(is_external=True)
- ext.created_at = long_ago
- await db_session.commit()
- resp = await async_client.get(
- "/api/v1/library/purge/preview",
- params={"older_than_days": 90, "include_never_printed": True},
- )
- assert resp.json()["count"] == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trash_settings_roundtrip(async_client: AsyncClient):
- """Retention setting persists and is clamped to [MIN, MAX]."""
- resp = await async_client.get("/api/v1/library/trash/settings")
- assert resp.status_code == 200
- default = resp.json()["retention_days"]
- assert 1 <= default <= 365
- resp = await async_client.put("/api/v1/library/trash/settings", json={"retention_days": 60})
- assert resp.status_code == 200
- assert resp.json()["retention_days"] == 60
- resp = await async_client.get("/api/v1/library/trash/settings")
- assert resp.json()["retention_days"] == 60
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trash_settings_rejects_out_of_range(async_client: AsyncClient):
- """retention_days must fall within the clamped range."""
- resp = await async_client.put("/api/v1/library/trash/settings", json={"retention_days": 0})
- assert resp.status_code == 422 # Pydantic ge=1 trip
- resp = await async_client.put("/api/v1/library/trash/settings", json={"retention_days": 9999})
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_sweeper_hard_deletes_past_retention(db_session):
- """The background sweeper clears rows whose deleted_at is older than retention."""
- from backend.app.models.library import LibraryFile
- from backend.app.services.library_trash import library_trash_service
- # Retention = 30 days; stamp one row 40 days ago, one 5 days ago.
- await library_trash_service.set_retention_days(db_session, 30)
- fresh = LibraryFile(
- filename="fresh.3mf",
- file_path="/tmp/fresh.3mf",
- file_size=1024,
- file_type="3mf",
- deleted_at=datetime.now(timezone.utc) - timedelta(days=5),
- )
- stale = LibraryFile(
- filename="stale.3mf",
- file_path="/tmp/stale.3mf",
- file_size=2048,
- file_type="3mf",
- deleted_at=datetime.now(timezone.utc) - timedelta(days=40),
- )
- db_session.add_all([fresh, stale])
- await db_session.commit()
- stale_id = stale.id
- fresh_id = fresh.id
- deleted = await library_trash_service._sweep(db_session)
- assert deleted >= 1
- # The sweeper commits in its own session; expire ours so get() re-reads.
- db_session.expire_all()
- remaining = await db_session.get(LibraryFile, stale_id)
- assert remaining is None
- still_there = await db_session.get(LibraryFile, fresh_id)
- assert still_there is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_purge_settings_roundtrip(async_client: AsyncClient):
- """Auto-purge fields on /library/trash/settings round-trip correctly."""
- resp = await async_client.put(
- "/api/v1/library/trash/settings",
- json={
- "retention_days": 30,
- "auto_purge_enabled": True,
- "auto_purge_days": 120,
- "auto_purge_include_never_printed": False,
- },
- )
- assert resp.status_code == 200
- body = resp.json()
- assert body["auto_purge_enabled"] is True
- assert body["auto_purge_days"] == 120
- assert body["auto_purge_include_never_printed"] is False
- # GET surfaces the same saved values
- resp = await async_client.get("/api/v1/library/trash/settings")
- got = resp.json()
- assert got["auto_purge_enabled"] is True
- assert got["auto_purge_days"] == 120
- assert got["auto_purge_include_never_printed"] is False
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_purge_runs_when_enabled_and_throttles_by_24h(file_factory, db_session):
- """The scheduler loop's auto-purge branch runs once, then the 24h throttle blocks."""
- from backend.app.services.library_trash import library_trash_service
- long_ago = datetime.now(timezone.utc) - timedelta(days=200)
- f = await file_factory()
- f.created_at = long_ago
- await db_session.commit()
- # Enable auto-purge with a 90-day threshold
- await library_trash_service.set_auto_purge_settings(db_session, enabled=True, days=90, include_never_printed=True)
- moved = await library_trash_service._maybe_run_auto_purge(db_session)
- assert moved >= 1
- db_session.expire_all()
- await db_session.refresh(f)
- assert f.deleted_at is not None
- # Second invocation within 24h should be throttled — no additional rows moved.
- long_ago2 = datetime.now(timezone.utc) - timedelta(days=200)
- f2 = await file_factory()
- f2.created_at = long_ago2
- await db_session.commit()
- moved_again = await library_trash_service._maybe_run_auto_purge(db_session)
- assert moved_again == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_purge_skipped_when_disabled(file_factory, db_session):
- """If the toggle is off, old files stay put even when everything else matches."""
- from backend.app.services.library_trash import library_trash_service
- long_ago = datetime.now(timezone.utc) - timedelta(days=200)
- f = await file_factory()
- f.created_at = long_ago
- await db_session.commit()
- await library_trash_service.set_auto_purge_settings(db_session, enabled=False, days=90, include_never_printed=True)
- moved = await library_trash_service._maybe_run_auto_purge(db_session)
- assert moved == 0
- db_session.expire_all()
- await db_session.refresh(f)
- assert f.deleted_at is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trashed_file_hidden_from_makerworld_dedupe(async_client: AsyncClient, file_factory, db_session):
- """MakerWorld 'already imported' dedupe must not match trashed rows."""
- from sqlalchemy import select
- from backend.app.models.library import LibraryFile
- f = await file_factory(source_type="makerworld", source_url="https://makerworld.com/en/models/99#profileId-1")
- # Trash it.
- await async_client.delete(f"/api/v1/library/files/{f.id}")
- # The dedupe query used by the makerworld helper is `source_url == X AND deleted_at IS NULL`.
- result = await db_session.execute(
- LibraryFile.active().where(LibraryFile.source_url == "https://makerworld.com/en/models/99#profileId-1")
- )
- assert result.scalar_one_or_none() is None
- # Direct lookup WITHOUT the active filter still sees the row.
- direct = await db_session.execute(
- select(LibraryFile).where(LibraryFile.source_url == "https://makerworld.com/en/models/99#profileId-1")
- )
- assert direct.scalar_one_or_none() is not None
|