library_trash.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. """Library trash sweeper + purge service (#1008).
  2. Two-stage file deletion for the library:
  3. 1. Users / admins soft-delete files — the row stays in ``library_files`` with
  4. ``deleted_at`` stamped; the bytes stay on disk. This is handled inline in
  5. ``backend.app.api.routes.library`` and exposed to admins as a bulk "purge
  6. old files" operation via :meth:`LibraryTrashService.purge_older_than`.
  7. 2. A background sweeper in this service hard-deletes rows (and their bytes)
  8. whose ``deleted_at`` is older than the configured retention window.
  9. External files (``is_external=True``) are never placed in the trash — their
  10. bytes live outside Bambuddy's control, so there's nothing to restore.
  11. """
  12. from __future__ import annotations
  13. import asyncio
  14. import logging
  15. from datetime import datetime, timedelta, timezone
  16. from pathlib import Path
  17. from sqlalchemy import and_, delete, func, or_, select
  18. from sqlalchemy.ext.asyncio import AsyncSession
  19. from backend.app.core.config import settings as app_settings
  20. from backend.app.core.database import async_session
  21. from backend.app.models.library import LibraryFile
  22. from backend.app.models.settings import Settings
  23. logger = logging.getLogger(__name__)
  24. # Settings key used to persist the trash retention window (days). The sweeper
  25. # reads this on every tick so the UI can change it without a restart.
  26. TRASH_RETENTION_KEY = "library_trash_retention_days"
  27. DEFAULT_RETENTION_DAYS = 30
  28. # Clamp retention to a sensible range. 1 day is a reasonable floor (anything
  29. # shorter just makes trash into hard-delete); 365 gives admins plenty of rope
  30. # without letting accidental typos (99999) grow the table unboundedly.
  31. MIN_RETENTION_DAYS = 1
  32. MAX_RETENTION_DAYS = 365
  33. # Auto-purge settings (#1008 follow-up). When enabled, the sweeper loop also
  34. # runs the admin bulk purge once per 24h using the saved age threshold.
  35. # Default-off so existing installs don't surprise users — opt-in via Settings.
  36. AUTO_PURGE_ENABLED_KEY = "library_auto_purge_enabled"
  37. AUTO_PURGE_DAYS_KEY = "library_auto_purge_days"
  38. AUTO_PURGE_INCLUDE_NEVER_PRINTED_KEY = "library_auto_purge_include_never_printed"
  39. AUTO_PURGE_LAST_RUN_KEY = "library_auto_purge_last_run"
  40. DEFAULT_AUTO_PURGE_DAYS = 90
  41. MIN_AUTO_PURGE_DAYS = 7 # anything shorter is begging for accidents
  42. MAX_AUTO_PURGE_DAYS = 3650
  43. def _to_absolute_path(relative_path: str | None) -> Path | None:
  44. """Mirror of the routes helper so this service has no route-module import.
  45. Accepts the legacy absolute paths that predate the relative-path migration
  46. verbatim; new rows always store paths relative to ``base_dir``.
  47. """
  48. if not relative_path:
  49. return None
  50. path = Path(relative_path)
  51. if path.is_absolute():
  52. return path
  53. return Path(app_settings.base_dir) / path
  54. def _age_cutoff(now: datetime, older_than_days: int) -> datetime:
  55. return now - timedelta(days=older_than_days)
  56. def _purge_filter(cutoff: datetime, include_never_printed: bool):
  57. """SQLAlchemy clause selecting files eligible for admin purge.
  58. A file is "old" if either (a) ``last_printed_at`` is set and predates the
  59. cutoff, or (b) ``last_printed_at`` is NULL *and* the file was uploaded
  60. before the cutoff — but only when ``include_never_printed`` is True.
  61. """
  62. last_printed_old = and_(
  63. LibraryFile.last_printed_at.isnot(None),
  64. LibraryFile.last_printed_at < cutoff,
  65. )
  66. if include_never_printed:
  67. never_printed_old = and_(
  68. LibraryFile.last_printed_at.is_(None),
  69. LibraryFile.created_at < cutoff,
  70. )
  71. age_clause = or_(last_printed_old, never_printed_old)
  72. else:
  73. age_clause = last_printed_old
  74. return and_(
  75. LibraryFile.deleted_at.is_(None),
  76. LibraryFile.is_external.is_(False),
  77. age_clause,
  78. )
  79. class LibraryTrashService:
  80. """Manages the trash retention sweeper and admin-triggered bulk purges."""
  81. def __init__(self):
  82. self._scheduler_task: asyncio.Task | None = None
  83. # Tick every 15 minutes — the window is a day, so this is plenty
  84. # responsive without burning CPU.
  85. self._check_interval = 900
  86. async def start_scheduler(self):
  87. """Start the background sweeper task (idempotent)."""
  88. if self._scheduler_task is not None:
  89. return
  90. logger.info("Starting library trash sweeper")
  91. self._scheduler_task = asyncio.create_task(self._scheduler_loop())
  92. def stop_scheduler(self):
  93. if self._scheduler_task:
  94. self._scheduler_task.cancel()
  95. self._scheduler_task = None
  96. logger.info("Stopped library trash sweeper")
  97. async def _scheduler_loop(self):
  98. while True:
  99. try:
  100. await asyncio.sleep(self._check_interval)
  101. async with async_session() as db:
  102. await self._sweep(db)
  103. await self._maybe_run_auto_purge(db)
  104. except asyncio.CancelledError:
  105. break
  106. except Exception as e: # pragma: no cover - defensive
  107. logger.error("Error in library trash sweeper: %s", e)
  108. await asyncio.sleep(60)
  109. # ---- Settings -----------------------------------------------------
  110. async def get_retention_days(self, db: AsyncSession | None = None) -> int:
  111. if db is None:
  112. async with async_session() as session:
  113. return await self._read_retention(session)
  114. return await self._read_retention(db)
  115. @staticmethod
  116. async def _read_retention(db: AsyncSession) -> int:
  117. result = await db.execute(select(Settings.value).where(Settings.key == TRASH_RETENTION_KEY))
  118. raw = result.scalar_one_or_none()
  119. if raw is None:
  120. return DEFAULT_RETENTION_DAYS
  121. try:
  122. days = int(raw)
  123. except (TypeError, ValueError):
  124. return DEFAULT_RETENTION_DAYS
  125. return max(MIN_RETENTION_DAYS, min(MAX_RETENTION_DAYS, days))
  126. async def set_retention_days(self, db: AsyncSession, days: int) -> int:
  127. """Persist the retention window. Clamped to [MIN, MAX]."""
  128. clamped = max(MIN_RETENTION_DAYS, min(MAX_RETENTION_DAYS, int(days)))
  129. result = await db.execute(select(Settings).where(Settings.key == TRASH_RETENTION_KEY))
  130. row = result.scalar_one_or_none()
  131. if row is None:
  132. db.add(Settings(key=TRASH_RETENTION_KEY, value=str(clamped)))
  133. else:
  134. row.value = str(clamped)
  135. await db.commit()
  136. return clamped
  137. @staticmethod
  138. async def _read_setting(db: AsyncSession, key: str) -> str | None:
  139. result = await db.execute(select(Settings.value).where(Settings.key == key))
  140. return result.scalar_one_or_none()
  141. @staticmethod
  142. async def _write_setting(db: AsyncSession, key: str, value: str) -> None:
  143. result = await db.execute(select(Settings).where(Settings.key == key))
  144. row = result.scalar_one_or_none()
  145. if row is None:
  146. db.add(Settings(key=key, value=value))
  147. else:
  148. row.value = value
  149. async def get_auto_purge_settings(self, db: AsyncSession) -> dict:
  150. """Return the current auto-purge config.
  151. Returns a dict with ``enabled`` (bool), ``days`` (int, clamped) and
  152. ``include_never_printed`` (bool). Missing keys default to disabled /
  153. 90 days / include-never-printed-on, matching the manual purge UX.
  154. """
  155. enabled_raw = await self._read_setting(db, AUTO_PURGE_ENABLED_KEY)
  156. days_raw = await self._read_setting(db, AUTO_PURGE_DAYS_KEY)
  157. incl_raw = await self._read_setting(db, AUTO_PURGE_INCLUDE_NEVER_PRINTED_KEY)
  158. enabled = (enabled_raw or "false").lower() == "true"
  159. try:
  160. days = int(days_raw) if days_raw is not None else DEFAULT_AUTO_PURGE_DAYS
  161. except (TypeError, ValueError):
  162. days = DEFAULT_AUTO_PURGE_DAYS
  163. days = max(MIN_AUTO_PURGE_DAYS, min(MAX_AUTO_PURGE_DAYS, days))
  164. include_never_printed = (incl_raw or "true").lower() == "true"
  165. return {
  166. "enabled": enabled,
  167. "days": days,
  168. "include_never_printed": include_never_printed,
  169. }
  170. async def set_auto_purge_settings(
  171. self,
  172. db: AsyncSession,
  173. *,
  174. enabled: bool,
  175. days: int,
  176. include_never_printed: bool,
  177. ) -> dict:
  178. """Persist auto-purge config; returns the saved (clamped) values."""
  179. clamped_days = max(MIN_AUTO_PURGE_DAYS, min(MAX_AUTO_PURGE_DAYS, int(days)))
  180. await self._write_setting(db, AUTO_PURGE_ENABLED_KEY, "true" if enabled else "false")
  181. await self._write_setting(db, AUTO_PURGE_DAYS_KEY, str(clamped_days))
  182. await self._write_setting(
  183. db,
  184. AUTO_PURGE_INCLUDE_NEVER_PRINTED_KEY,
  185. "true" if include_never_printed else "false",
  186. )
  187. await db.commit()
  188. return {
  189. "enabled": enabled,
  190. "days": clamped_days,
  191. "include_never_printed": include_never_printed,
  192. }
  193. async def _get_last_auto_purge_run(self, db: AsyncSession) -> datetime | None:
  194. raw = await self._read_setting(db, AUTO_PURGE_LAST_RUN_KEY)
  195. if not raw:
  196. return None
  197. try:
  198. # Stored as ISO 8601 UTC; tolerate both with and without 'Z' suffix.
  199. return datetime.fromisoformat(raw.replace("Z", "+00:00"))
  200. except ValueError:
  201. return None
  202. async def _stamp_last_auto_purge_run(self, db: AsyncSession, when: datetime) -> None:
  203. await self._write_setting(db, AUTO_PURGE_LAST_RUN_KEY, when.isoformat())
  204. await db.commit()
  205. async def _maybe_run_auto_purge(self, db: AsyncSession) -> int:
  206. """If auto-purge is enabled and >=24h has elapsed since the last run, run it.
  207. Returns the number of files moved to trash (0 if disabled or throttled).
  208. The 24h throttle means a 15-minute sweeper cadence still only triggers
  209. one actual purge per day, keeping the DB churn predictable.
  210. """
  211. cfg = await self.get_auto_purge_settings(db)
  212. if not cfg["enabled"]:
  213. return 0
  214. now = datetime.now(timezone.utc)
  215. last = await self._get_last_auto_purge_run(db)
  216. if last is not None and (now - last) < timedelta(hours=24):
  217. return 0
  218. moved = await self.purge_older_than(
  219. db,
  220. older_than_days=cfg["days"],
  221. include_never_printed=cfg["include_never_printed"],
  222. )
  223. await self._stamp_last_auto_purge_run(db, now)
  224. if moved:
  225. logger.info("Library auto-purge: moved %d file(s) to trash (threshold=%d days)", moved, cfg["days"])
  226. return moved
  227. # ---- Preview / purge ---------------------------------------------
  228. async def preview_purge(
  229. self,
  230. db: AsyncSession,
  231. older_than_days: int,
  232. include_never_printed: bool = True,
  233. sample_limit: int = 5,
  234. ) -> dict:
  235. """Count + size of files eligible for purge. Reads only; never mutates."""
  236. if older_than_days < 1:
  237. return {"count": 0, "total_bytes": 0, "sample_filenames": []}
  238. now = datetime.now(timezone.utc)
  239. cutoff = _age_cutoff(now, older_than_days)
  240. clause = _purge_filter(cutoff, include_never_printed)
  241. count_result = await db.execute(select(func.count(LibraryFile.id)).where(clause))
  242. count = int(count_result.scalar() or 0)
  243. size_result = await db.execute(select(func.coalesce(func.sum(LibraryFile.file_size), 0)).where(clause))
  244. total_bytes = int(size_result.scalar() or 0)
  245. sample_result = await db.execute(
  246. select(LibraryFile.filename).where(clause).order_by(LibraryFile.created_at).limit(sample_limit)
  247. )
  248. samples = [row[0] for row in sample_result.all()]
  249. return {
  250. "count": count,
  251. "total_bytes": total_bytes,
  252. "sample_filenames": samples,
  253. "older_than_days": older_than_days,
  254. "include_never_printed": include_never_printed,
  255. }
  256. async def purge_older_than(
  257. self,
  258. db: AsyncSession,
  259. older_than_days: int,
  260. include_never_printed: bool = True,
  261. ) -> int:
  262. """Move matching files to trash (stamps ``deleted_at``). Returns count."""
  263. if older_than_days < 1:
  264. return 0
  265. now = datetime.now(timezone.utc)
  266. cutoff = _age_cutoff(now, older_than_days)
  267. clause = _purge_filter(cutoff, include_never_printed)
  268. # We need the IDs so callers can audit or display them if they want.
  269. # Doing a single UPDATE ... WHERE is safe even under concurrent
  270. # uploads — the clause already excludes rows with deleted_at set.
  271. id_result = await db.execute(select(LibraryFile.id).where(clause))
  272. ids = [row[0] for row in id_result.all()]
  273. if not ids:
  274. return 0
  275. await db.execute(LibraryFile.__table__.update().where(LibraryFile.id.in_(ids)).values(deleted_at=now))
  276. await db.commit()
  277. logger.info("Library purge: moved %d file(s) to trash (older_than_days=%d)", len(ids), older_than_days)
  278. return len(ids)
  279. # ---- Sweeper ------------------------------------------------------
  280. async def _sweep(self, db: AsyncSession) -> int:
  281. """Hard-delete trashed rows whose retention window has elapsed."""
  282. retention = await self._read_retention(db)
  283. now = datetime.now(timezone.utc)
  284. cutoff = now - timedelta(days=retention)
  285. result = await db.execute(
  286. select(LibraryFile).where(
  287. LibraryFile.deleted_at.isnot(None),
  288. LibraryFile.deleted_at < cutoff,
  289. )
  290. )
  291. rows = result.scalars().all()
  292. if not rows:
  293. return 0
  294. deleted = 0
  295. for row in rows:
  296. self._unlink_on_disk(row)
  297. deleted += 1
  298. # Single DELETE is faster than N await db.delete() round-trips; we
  299. # still need the Python loop above to unlink bytes on disk.
  300. await db.execute(delete(LibraryFile).where(LibraryFile.id.in_([r.id for r in rows])))
  301. await db.commit()
  302. logger.info("Library trash sweeper: hard-deleted %d row(s) past %d-day retention", deleted, retention)
  303. return deleted
  304. @staticmethod
  305. def _unlink_on_disk(row: LibraryFile) -> None:
  306. """Best-effort cleanup of the file + thumbnail on disk."""
  307. for rel in (row.file_path, row.thumbnail_path):
  308. abs_path = _to_absolute_path(rel)
  309. if abs_path is None:
  310. continue
  311. try:
  312. if abs_path.exists():
  313. abs_path.unlink()
  314. except OSError as e:
  315. logger.warning("Trash sweep: failed to unlink %s: %s", abs_path, e)
  316. # ---- User-facing trash ops ----------------------------------------
  317. async def restore(self, db: AsyncSession, file: LibraryFile) -> LibraryFile:
  318. """Clear ``deleted_at`` so the file reappears in listings."""
  319. file.deleted_at = None
  320. await db.commit()
  321. await db.refresh(file)
  322. return file
  323. async def hard_delete_now(self, db: AsyncSession, file: LibraryFile) -> None:
  324. """Bypass retention and delete this trashed file + its bytes immediately."""
  325. self._unlink_on_disk(file)
  326. await db.delete(file)
  327. await db.commit()
  328. library_trash_service = LibraryTrashService()