test_archive_purge_api.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. """Integration tests for archive auto-purge (#1008 follow-up)."""
  2. from datetime import datetime, timedelta, timezone
  3. import pytest
  4. from httpx import AsyncClient
  5. @pytest.mark.asyncio
  6. @pytest.mark.integration
  7. async def test_settings_defaults_when_unset(async_client: AsyncClient):
  8. """GET /archives/purge/settings returns sensible defaults on a fresh install."""
  9. resp = await async_client.get("/api/v1/archives/purge/settings")
  10. assert resp.status_code == 200
  11. body = resp.json()
  12. assert body["enabled"] is False
  13. assert body["days"] == 365
  14. @pytest.mark.asyncio
  15. @pytest.mark.integration
  16. async def test_settings_roundtrip(async_client: AsyncClient):
  17. """PUT persists, GET returns the saved values, days is clamped."""
  18. resp = await async_client.put(
  19. "/api/v1/archives/purge/settings",
  20. json={"enabled": True, "days": 180},
  21. )
  22. assert resp.status_code == 200
  23. assert resp.json() == {"enabled": True, "days": 180}
  24. resp = await async_client.get("/api/v1/archives/purge/settings")
  25. assert resp.json() == {"enabled": True, "days": 180}
  26. @pytest.mark.asyncio
  27. @pytest.mark.integration
  28. async def test_settings_rejects_out_of_range_days(async_client: AsyncClient):
  29. """days below MIN or above MAX is rejected."""
  30. resp = await async_client.put(
  31. "/api/v1/archives/purge/settings",
  32. json={"enabled": True, "days": 1},
  33. )
  34. # Pydantic validation returns 422; explicit bound check returns 400.
  35. assert resp.status_code in (400, 422)
  36. @pytest.mark.asyncio
  37. @pytest.mark.integration
  38. async def test_preview_counts_old_archives(async_client: AsyncClient, archive_factory, printer_factory, db_session):
  39. """Preview returns the count + total bytes of archives older than the threshold."""
  40. printer = await printer_factory()
  41. old = await archive_factory(printer.id, print_name="Old", file_size=1000)
  42. fresh = await archive_factory(printer.id, print_name="Fresh", file_size=2000)
  43. old.created_at = datetime.now(timezone.utc) - timedelta(days=400)
  44. fresh.created_at = datetime.now(timezone.utc) - timedelta(days=10)
  45. await db_session.commit()
  46. resp = await async_client.get("/api/v1/archives/purge/preview?older_than_days=365")
  47. assert resp.status_code == 200
  48. body = resp.json()
  49. assert body["count"] == 1
  50. assert body["total_bytes"] == 1000
  51. assert "Old" in body["sample_filenames"][0] or old.filename in body["sample_filenames"]
  52. @pytest.mark.asyncio
  53. @pytest.mark.integration
  54. async def test_manual_purge_deletes_old_archives(
  55. async_client: AsyncClient, archive_factory, printer_factory, db_session
  56. ):
  57. """POST /archives/purge hard-deletes archives older than the threshold."""
  58. from backend.app.models.archive import PrintArchive
  59. printer = await printer_factory()
  60. old = await archive_factory(printer.id, print_name="Old")
  61. fresh = await archive_factory(printer.id, print_name="Fresh")
  62. old_id = old.id
  63. fresh_id = fresh.id
  64. old.created_at = datetime.now(timezone.utc) - timedelta(days=400)
  65. fresh.created_at = datetime.now(timezone.utc) - timedelta(days=10)
  66. await db_session.commit()
  67. resp = await async_client.post(
  68. "/api/v1/archives/purge",
  69. json={"older_than_days": 365},
  70. )
  71. assert resp.status_code == 200
  72. assert resp.json()["deleted"] == 1
  73. # Old is gone, fresh remains.
  74. db_session.expire_all()
  75. assert await db_session.get(PrintArchive, old_id) is None
  76. assert await db_session.get(PrintArchive, fresh_id) is not None
  77. @pytest.mark.asyncio
  78. @pytest.mark.integration
  79. async def test_auto_purge_runs_when_enabled(async_client: AsyncClient, archive_factory, printer_factory, db_session):
  80. """With the toggle on, a stale archive is hard-deleted by the sweeper.
  81. ``async_client`` is included solely so its fixture activates the module-level
  82. ``async_session`` patches that let :meth:`purge_older_than`'s per-row
  83. delete sessions reach the in-memory test database.
  84. """
  85. from backend.app.models.archive import PrintArchive
  86. from backend.app.services.archive_purge import archive_purge_service
  87. printer = await printer_factory()
  88. stale = await archive_factory(printer.id, print_name="Stale")
  89. stale_id = stale.id
  90. stale.created_at = datetime.now(timezone.utc) - timedelta(days=400)
  91. await db_session.commit()
  92. await archive_purge_service.set_settings(db_session, enabled=True, days=365)
  93. deleted = await archive_purge_service._maybe_run_auto_purge(db_session)
  94. assert deleted >= 1
  95. db_session.expire_all()
  96. assert await db_session.get(PrintArchive, stale_id) is None
  97. @pytest.mark.asyncio
  98. @pytest.mark.integration
  99. async def test_auto_purge_throttles_within_24h(async_client: AsyncClient, archive_factory, printer_factory, db_session):
  100. """A recent last-run timestamp blocks the sweeper for 24h."""
  101. from backend.app.services.archive_purge import archive_purge_service
  102. printer = await printer_factory()
  103. stale = await archive_factory(printer.id, print_name="Stale")
  104. stale.created_at = datetime.now(timezone.utc) - timedelta(days=400)
  105. await db_session.commit()
  106. await archive_purge_service.set_settings(db_session, enabled=True, days=365)
  107. # Stamp a last-run time 1h ago — should block the sweeper for another 23h.
  108. await archive_purge_service._stamp_last_run(db_session, datetime.now(timezone.utc) - timedelta(hours=1))
  109. deleted = await archive_purge_service._maybe_run_auto_purge(db_session)
  110. assert deleted == 0
  111. @pytest.mark.asyncio
  112. @pytest.mark.integration
  113. async def test_auto_purge_skipped_when_disabled(
  114. async_client: AsyncClient, archive_factory, printer_factory, db_session
  115. ):
  116. """When the toggle is off, old archives stay put."""
  117. from backend.app.models.archive import PrintArchive
  118. from backend.app.services.archive_purge import archive_purge_service
  119. printer = await printer_factory()
  120. stale = await archive_factory(printer.id, print_name="Stale")
  121. stale_id = stale.id
  122. stale.created_at = datetime.now(timezone.utc) - timedelta(days=400)
  123. await db_session.commit()
  124. await archive_purge_service.set_settings(db_session, enabled=False, days=365)
  125. deleted = await archive_purge_service._maybe_run_auto_purge(db_session)
  126. assert deleted == 0
  127. db_session.expire_all()
  128. assert await db_session.get(PrintArchive, stale_id) is not None