library_trash.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. """Library trash sweeper + purge service (#1008).
  2. Two-stage file deletion for the library:
  3. 1. Users / admins soft-delete files — the row stays in ``library_files`` with
  4. ``deleted_at`` stamped; the bytes stay on disk. This is handled inline in
  5. ``backend.app.api.routes.library`` and exposed to admins as a bulk "purge
  6. old files" operation via :meth:`LibraryTrashService.purge_older_than`.
  7. 2. A background sweeper in this service hard-deletes rows (and their bytes)
  8. whose ``deleted_at`` is older than the configured retention window.
  9. External files (``is_external=True``) are never placed in the trash — their
  10. bytes live outside Bambuddy's control, so there's nothing to restore.
  11. """
  12. from __future__ import annotations
  13. import asyncio
  14. import logging
  15. from datetime import datetime, timedelta, timezone
  16. from pathlib import Path
  17. from sqlalchemy import and_, delete, func, or_, select
  18. from sqlalchemy.ext.asyncio import AsyncSession
  19. from backend.app.core.config import settings as app_settings
  20. from backend.app.core.database import async_session
  21. from backend.app.models.library import LibraryFile
  22. from backend.app.models.settings import Settings
  23. logger = logging.getLogger(__name__)
  24. # Settings key used to persist the trash retention window (days). The sweeper
  25. # reads this on every tick so the UI can change it without a restart.
  26. TRASH_RETENTION_KEY = "library_trash_retention_days"
  27. DEFAULT_RETENTION_DAYS = 30
  28. # Clamp retention to a sensible range. 1 day is a reasonable floor (anything
  29. # shorter just makes trash into hard-delete); 365 gives admins plenty of rope
  30. # without letting accidental typos (99999) grow the table unboundedly.
  31. MIN_RETENTION_DAYS = 1
  32. MAX_RETENTION_DAYS = 365
  33. # Auto-purge settings (#1008 follow-up). When enabled, the sweeper loop also
  34. # runs the admin bulk purge once per 24h using the saved age threshold.
  35. # Default-off so existing installs don't surprise users — opt-in via Settings.
  36. AUTO_PURGE_ENABLED_KEY = "library_auto_purge_enabled"
  37. AUTO_PURGE_DAYS_KEY = "library_auto_purge_days"
  38. AUTO_PURGE_INCLUDE_NEVER_PRINTED_KEY = "library_auto_purge_include_never_printed"
  39. AUTO_PURGE_LAST_RUN_KEY = "library_auto_purge_last_run"
  40. DEFAULT_AUTO_PURGE_DAYS = 90
  41. MIN_AUTO_PURGE_DAYS = 7 # anything shorter is begging for accidents
  42. MAX_AUTO_PURGE_DAYS = 3650
  43. def _to_absolute_path(relative_path: str | None) -> Path | None:
  44. """Mirror of the routes helper so this service has no route-module import.
  45. Accepts the legacy absolute paths that predate the relative-path migration
  46. verbatim; new rows always store paths relative to ``base_dir``.
  47. """
  48. if not relative_path:
  49. return None
  50. path = Path(relative_path)
  51. if path.is_absolute():
  52. return path
  53. return (
  54. Path(app_settings.base_dir) / path
  55. ) # SEC-PATH-OK: relative_path is LibraryFile.file_path / LibraryFile.thumbnail_path — DB-stored, internally generated by the upload pipeline
  56. def _age_cutoff(now: datetime, older_than_days: int) -> datetime:
  57. return now - timedelta(days=older_than_days)
  58. def _purge_filter(cutoff: datetime, include_never_printed: bool):
  59. """SQLAlchemy clause selecting files eligible for admin purge.
  60. A file is "old" if either (a) ``last_printed_at`` is set and predates the
  61. cutoff, or (b) ``last_printed_at`` is NULL *and* the file was uploaded
  62. before the cutoff — but only when ``include_never_printed`` is True.
  63. """
  64. last_printed_old = and_(
  65. LibraryFile.last_printed_at.isnot(None),
  66. LibraryFile.last_printed_at < cutoff,
  67. )
  68. if include_never_printed:
  69. never_printed_old = and_(
  70. LibraryFile.last_printed_at.is_(None),
  71. LibraryFile.created_at < cutoff,
  72. )
  73. age_clause = or_(last_printed_old, never_printed_old)
  74. else:
  75. age_clause = last_printed_old
  76. return and_(
  77. LibraryFile.deleted_at.is_(None),
  78. LibraryFile.is_external.is_(False),
  79. age_clause,
  80. )
  81. class LibraryTrashService:
  82. """Manages the trash retention sweeper and admin-triggered bulk purges."""
  83. def __init__(self):
  84. self._scheduler_task: asyncio.Task | None = None
  85. # Tick every 15 minutes — the window is a day, so this is plenty
  86. # responsive without burning CPU.
  87. self._check_interval = 900
  88. async def start_scheduler(self):
  89. """Start the background sweeper task (idempotent)."""
  90. if self._scheduler_task is not None:
  91. return
  92. logger.info("Starting library trash sweeper")
  93. self._scheduler_task = asyncio.create_task(self._scheduler_loop())
  94. def stop_scheduler(self):
  95. if self._scheduler_task:
  96. self._scheduler_task.cancel()
  97. self._scheduler_task = None
  98. logger.info("Stopped library trash sweeper")
  99. async def _scheduler_loop(self):
  100. while True:
  101. try:
  102. await asyncio.sleep(self._check_interval)
  103. async with async_session() as db:
  104. await self._sweep(db)
  105. await self._maybe_run_auto_purge(db)
  106. except asyncio.CancelledError:
  107. break
  108. except Exception as e: # pragma: no cover - defensive
  109. logger.error("Error in library trash sweeper: %s", e)
  110. await asyncio.sleep(60)
  111. # ---- Settings -----------------------------------------------------
  112. async def get_retention_days(self, db: AsyncSession | None = None) -> int:
  113. if db is None:
  114. async with async_session() as session:
  115. return await self._read_retention(session)
  116. return await self._read_retention(db)
  117. @staticmethod
  118. async def _read_retention(db: AsyncSession) -> int:
  119. result = await db.execute(select(Settings.value).where(Settings.key == TRASH_RETENTION_KEY))
  120. raw = result.scalar_one_or_none()
  121. if raw is None:
  122. return DEFAULT_RETENTION_DAYS
  123. try:
  124. days = int(raw)
  125. except (TypeError, ValueError):
  126. return DEFAULT_RETENTION_DAYS
  127. return max(MIN_RETENTION_DAYS, min(MAX_RETENTION_DAYS, days))
  128. async def set_retention_days(self, db: AsyncSession, days: int) -> int:
  129. """Persist the retention window. Clamped to [MIN, MAX]."""
  130. clamped = max(MIN_RETENTION_DAYS, min(MAX_RETENTION_DAYS, int(days)))
  131. result = await db.execute(select(Settings).where(Settings.key == TRASH_RETENTION_KEY))
  132. row = result.scalar_one_or_none()
  133. if row is None:
  134. db.add(Settings(key=TRASH_RETENTION_KEY, value=str(clamped)))
  135. else:
  136. row.value = str(clamped)
  137. await db.commit()
  138. return clamped
  139. @staticmethod
  140. async def _read_setting(db: AsyncSession, key: str) -> str | None:
  141. result = await db.execute(select(Settings.value).where(Settings.key == key))
  142. return result.scalar_one_or_none()
  143. @staticmethod
  144. async def _write_setting(db: AsyncSession, key: str, value: str) -> None:
  145. result = await db.execute(select(Settings).where(Settings.key == key))
  146. row = result.scalar_one_or_none()
  147. if row is None:
  148. db.add(Settings(key=key, value=value))
  149. else:
  150. row.value = value
  151. async def get_auto_purge_settings(self, db: AsyncSession) -> dict:
  152. """Return the current auto-purge config.
  153. Returns a dict with ``enabled`` (bool), ``days`` (int, clamped) and
  154. ``include_never_printed`` (bool). Missing keys default to disabled /
  155. 90 days / include-never-printed-on, matching the manual purge UX.
  156. """
  157. enabled_raw = await self._read_setting(db, AUTO_PURGE_ENABLED_KEY)
  158. days_raw = await self._read_setting(db, AUTO_PURGE_DAYS_KEY)
  159. incl_raw = await self._read_setting(db, AUTO_PURGE_INCLUDE_NEVER_PRINTED_KEY)
  160. enabled = (enabled_raw or "false").lower() == "true"
  161. try:
  162. days = int(days_raw) if days_raw is not None else DEFAULT_AUTO_PURGE_DAYS
  163. except (TypeError, ValueError):
  164. days = DEFAULT_AUTO_PURGE_DAYS
  165. days = max(MIN_AUTO_PURGE_DAYS, min(MAX_AUTO_PURGE_DAYS, days))
  166. include_never_printed = (incl_raw or "true").lower() == "true"
  167. return {
  168. "enabled": enabled,
  169. "days": days,
  170. "include_never_printed": include_never_printed,
  171. }
  172. async def set_auto_purge_settings(
  173. self,
  174. db: AsyncSession,
  175. *,
  176. enabled: bool,
  177. days: int,
  178. include_never_printed: bool,
  179. ) -> dict:
  180. """Persist auto-purge config; returns the saved (clamped) values."""
  181. clamped_days = max(MIN_AUTO_PURGE_DAYS, min(MAX_AUTO_PURGE_DAYS, int(days)))
  182. await self._write_setting(db, AUTO_PURGE_ENABLED_KEY, "true" if enabled else "false")
  183. await self._write_setting(db, AUTO_PURGE_DAYS_KEY, str(clamped_days))
  184. await self._write_setting(
  185. db,
  186. AUTO_PURGE_INCLUDE_NEVER_PRINTED_KEY,
  187. "true" if include_never_printed else "false",
  188. )
  189. await db.commit()
  190. return {
  191. "enabled": enabled,
  192. "days": clamped_days,
  193. "include_never_printed": include_never_printed,
  194. }
  195. async def _get_last_auto_purge_run(self, db: AsyncSession) -> datetime | None:
  196. raw = await self._read_setting(db, AUTO_PURGE_LAST_RUN_KEY)
  197. if not raw:
  198. return None
  199. try:
  200. # Stored as ISO 8601 UTC; tolerate both with and without 'Z' suffix.
  201. return datetime.fromisoformat(raw.replace("Z", "+00:00"))
  202. except ValueError:
  203. return None
  204. async def _stamp_last_auto_purge_run(self, db: AsyncSession, when: datetime) -> None:
  205. await self._write_setting(db, AUTO_PURGE_LAST_RUN_KEY, when.isoformat())
  206. await db.commit()
  207. async def _maybe_run_auto_purge(self, db: AsyncSession) -> int:
  208. """If auto-purge is enabled and >=24h has elapsed since the last run, run it.
  209. Returns the number of files moved to trash (0 if disabled or throttled).
  210. The 24h throttle means a 15-minute sweeper cadence still only triggers
  211. one actual purge per day, keeping the DB churn predictable.
  212. """
  213. cfg = await self.get_auto_purge_settings(db)
  214. if not cfg["enabled"]:
  215. return 0
  216. now = datetime.now(timezone.utc)
  217. last = await self._get_last_auto_purge_run(db)
  218. if last is not None and (now - last) < timedelta(hours=24):
  219. return 0
  220. moved = await self.purge_older_than(
  221. db,
  222. older_than_days=cfg["days"],
  223. include_never_printed=cfg["include_never_printed"],
  224. )
  225. await self._stamp_last_auto_purge_run(db, now)
  226. if moved:
  227. logger.info("Library auto-purge: moved %d file(s) to trash (threshold=%d days)", moved, cfg["days"])
  228. return moved
  229. # ---- Preview / purge ---------------------------------------------
  230. async def preview_purge(
  231. self,
  232. db: AsyncSession,
  233. older_than_days: int,
  234. include_never_printed: bool = True,
  235. sample_limit: int = 5,
  236. ) -> dict:
  237. """Count + size of files eligible for purge. Reads only; never mutates."""
  238. if older_than_days < 1:
  239. return {"count": 0, "total_bytes": 0, "sample_filenames": []}
  240. now = datetime.now(timezone.utc)
  241. cutoff = _age_cutoff(now, older_than_days)
  242. clause = _purge_filter(cutoff, include_never_printed)
  243. count_result = await db.execute(select(func.count(LibraryFile.id)).where(clause))
  244. count = int(count_result.scalar() or 0)
  245. size_result = await db.execute(select(func.coalesce(func.sum(LibraryFile.file_size), 0)).where(clause))
  246. total_bytes = int(size_result.scalar() or 0)
  247. sample_result = await db.execute(
  248. select(LibraryFile.filename).where(clause).order_by(LibraryFile.created_at).limit(sample_limit)
  249. )
  250. samples = [row[0] for row in sample_result.all()]
  251. return {
  252. "count": count,
  253. "total_bytes": total_bytes,
  254. "sample_filenames": samples,
  255. "older_than_days": older_than_days,
  256. "include_never_printed": include_never_printed,
  257. }
  258. async def purge_older_than(
  259. self,
  260. db: AsyncSession,
  261. older_than_days: int,
  262. include_never_printed: bool = True,
  263. ) -> int:
  264. """Move matching files to trash (stamps ``deleted_at``). Returns count."""
  265. if older_than_days < 1:
  266. return 0
  267. now = datetime.now(timezone.utc)
  268. cutoff = _age_cutoff(now, older_than_days)
  269. clause = _purge_filter(cutoff, include_never_printed)
  270. # We need the IDs so callers can audit or display them if they want.
  271. # Doing a single UPDATE ... WHERE is safe even under concurrent
  272. # uploads — the clause already excludes rows with deleted_at set.
  273. id_result = await db.execute(select(LibraryFile.id).where(clause))
  274. ids = [row[0] for row in id_result.all()]
  275. if not ids:
  276. return 0
  277. await db.execute(LibraryFile.__table__.update().where(LibraryFile.id.in_(ids)).values(deleted_at=now))
  278. await db.commit()
  279. logger.info("Library purge: moved %d file(s) to trash (older_than_days=%d)", len(ids), older_than_days)
  280. return len(ids)
  281. # ---- Sweeper ------------------------------------------------------
  282. async def _sweep(self, db: AsyncSession) -> int:
  283. """Hard-delete trashed rows whose retention window has elapsed."""
  284. retention = await self._read_retention(db)
  285. now = datetime.now(timezone.utc)
  286. cutoff = now - timedelta(days=retention)
  287. result = await db.execute(
  288. select(LibraryFile).where(
  289. LibraryFile.deleted_at.isnot(None),
  290. LibraryFile.deleted_at < cutoff,
  291. )
  292. )
  293. rows = result.scalars().all()
  294. if not rows:
  295. return 0
  296. deleted = 0
  297. for row in rows:
  298. self._unlink_on_disk(row)
  299. deleted += 1
  300. # Single DELETE is faster than N await db.delete() round-trips; we
  301. # still need the Python loop above to unlink bytes on disk.
  302. await db.execute(delete(LibraryFile).where(LibraryFile.id.in_([r.id for r in rows])))
  303. await db.commit()
  304. logger.info("Library trash sweeper: hard-deleted %d row(s) past %d-day retention", deleted, retention)
  305. return deleted
  306. @staticmethod
  307. def _unlink_on_disk(row: LibraryFile) -> None:
  308. """Best-effort cleanup of the file + thumbnail on disk."""
  309. for rel in (row.file_path, row.thumbnail_path):
  310. abs_path = _to_absolute_path(rel)
  311. if abs_path is None:
  312. continue
  313. try:
  314. if abs_path.exists():
  315. abs_path.unlink()
  316. except OSError as e:
  317. logger.warning("Trash sweep: failed to unlink %s: %s", abs_path, e)
  318. # ---- User-facing trash ops ----------------------------------------
  319. async def restore(self, db: AsyncSession, file: LibraryFile) -> LibraryFile:
  320. """Clear ``deleted_at`` so the file reappears in listings."""
  321. file.deleted_at = None
  322. await db.commit()
  323. await db.refresh(file)
  324. return file
  325. async def hard_delete_now(self, db: AsyncSession, file: LibraryFile) -> None:
  326. """Bypass retention and delete this trashed file + its bytes immediately."""
  327. self._unlink_on_disk(file)
  328. await db.delete(file)
  329. await db.commit()
  330. library_trash_service = LibraryTrashService()