Browse Source

Fix ghost jobs from SQLite lock on print completion (#897)

  Queue status update (printing → completed) failed silently when SQLite
  was locked by another writer, leaving ghost jobs permanently stuck in
  printing status. Add run_with_retry() for SQLite lock retries and split
  runtime tracker into per-printer commits to reduce lock hold time.
maziggy 1 month ago
parent
commit
2d9a56b3d0

+ 1 - 0
CHANGELOG.md

@@ -31,6 +31,7 @@ All notable changes to Bambuddy will be documented in this file.
 - **Thumbnails Broken After Backend Restart** — Archive and library thumbnails returned 401 Unauthorized after a backend restart because stream tokens are stored in memory and lost on restart. The frontend now detects failed token-protected image loads and automatically refreshes the stream token, so thumbnails recover without a page reload.
 - **Thumbnails Broken After Backend Restart** — Archive and library thumbnails returned 401 Unauthorized after a backend restart because stream tokens are stored in memory and lost on restart. The frontend now detects failed token-protected image loads and automatically refreshes the stream token, so thumbnails recover without a page reload.
 - **SpoolBuddy Kiosk Screen Blanks on Boot** — The touchscreen display would blank immediately after the RPi booted, requiring a touch to wake. Added `consoleblank=0` to the kernel cmdline to disable Linux console blanking during the Plymouth-to-labwc transition, and changed the `wlr-randr` anti-blank loop to fire immediately instead of sleeping 60 seconds first.
 - **SpoolBuddy Kiosk Screen Blanks on Boot** — The touchscreen display would blank immediately after the RPi booted, requiring a touch to wake. Added `consoleblank=0` to the kernel cmdline to disable Linux console blanking during the Plymouth-to-labwc transition, and changed the `wlr-randr` anti-blank loop to fire immediately instead of sleeping 60 seconds first.
 - **Queue Widget Ignores Plate-Clear Setting** ([#752](https://github.com/maziggy/bambuddy/issues/752)) — The "Clear Plate & Start Next" button on printer cards appeared even when "Require plate-clear confirmation" was disabled in Settings → Queue. The backend correctly auto-dispatched without waiting, but the frontend widget always showed the prompt. The widget now respects the setting and shows a passive queue link instead when plate-clear confirmation is disabled.
 - **Queue Widget Ignores Plate-Clear Setting** ([#752](https://github.com/maziggy/bambuddy/issues/752)) — The "Clear Plate & Start Next" button on printer cards appeared even when "Require plate-clear confirmation" was disabled in Settings → Queue. The backend correctly auto-dispatched without waiting, but the frontend widget always showed the prompt. The widget now respects the setting and shows a passive queue link instead when plate-clear confirmation is disabled.
+- **Ghost Jobs From SQLite Lock on Print Completion** ([#897](https://github.com/maziggy/bambuddy/issues/897)) — When a print finished, the queue status update (`printing` → `completed`) could fail silently if the SQLite database was locked by another writer (e.g. the runtime tracker). The failed commit left the job permanently stuck in `printing` status — a "ghost job" that caused the UI to show false double-assignments when the next job started. The critical queue status commit now retries up to 3 times with backoff on SQLite lock errors (PostgreSQL is unaffected — it uses row-level locking). Additionally, the runtime tracker was holding a single long transaction across all printers; it now commits per-printer to minimize lock hold time.
 - **AMS Slot Changes Fail Until Reconnect** ([#887](https://github.com/maziggy/bambuddy/issues/887)) — After a keep-alive timeout, paho-mqtt auto-reconnects but the new session can be half-broken: the printer continues sending status updates but silently ignores commands. The developer mode probe detected this (no response, leaving `developer_mode` as `null`), but had no timeout or recovery — one unanswered probe permanently blocked retries. Added a 10-second probe timeout with one retry; after two consecutive unanswered probes, Bambuddy force-closes the socket to trigger a clean reconnect with a fresh session. Additionally, the developer mode probe was firing on every auto-reconnect, which destabilized some firmware MQTT brokers (A1/P1 series) — causing a reconnect → probe → disconnect feedback loop. The probe result is now cached across reconnects and only runs once on the first connection, with a 5-second delay after connect to let the session stabilize.
 - **AMS Slot Changes Fail Until Reconnect** ([#887](https://github.com/maziggy/bambuddy/issues/887)) — After a keep-alive timeout, paho-mqtt auto-reconnects but the new session can be half-broken: the printer continues sending status updates but silently ignores commands. The developer mode probe detected this (no response, leaving `developer_mode` as `null`), but had no timeout or recovery — one unanswered probe permanently blocked retries. Added a 10-second probe timeout with one retry; after two consecutive unanswered probes, Bambuddy force-closes the socket to trigger a clean reconnect with a fresh session. Additionally, the developer mode probe was firing on every auto-reconnect, which destabilized some firmware MQTT brokers (A1/P1 series) — causing a reconnect → probe → disconnect feedback loop. The probe result is now cached across reconnects and only runs once on the first connection, with a 5-second delay after connect to let the session stabilize.
 - **WebSocket Crash on Printers Without `fun` Field** ([#873](https://github.com/maziggy/bambuddy/issues/873)) — Connecting to printers that don't send the MQTT `fun` field (A1, P1 series, X1Plus firmware) caused a repeating `'str' object has no attribute 'get'` crash in the WebSocket handler, showing the printer as offline with missing AMS and SD card info. The developer mode probe introduced in 0.2.3b1 published an MQTT message inside `_update_state()` between overwriting `raw_data` with the full MQTT dict (where `vt_tray` is a raw dict) and restoring the previously normalized list — the `publish()` call released the GIL, letting the event loop read the un-normalized dict and iterate over string keys instead of spool dicts. Fixed by normalizing `vt_tray` dict→list in the MQTT data before assignment, and moving preserved field restoration before the probe. Added defensive normalization in `printer_state_to_dict` as a belt-and-suspenders guard.
 - **WebSocket Crash on Printers Without `fun` Field** ([#873](https://github.com/maziggy/bambuddy/issues/873)) — Connecting to printers that don't send the MQTT `fun` field (A1, P1 series, X1Plus firmware) caused a repeating `'str' object has no attribute 'get'` crash in the WebSocket handler, showing the printer as offline with missing AMS and SD card info. The developer mode probe introduced in 0.2.3b1 published an MQTT message inside `_update_state()` between overwriting `raw_data` with the full MQTT dict (where `vt_tray` is a raw dict) and restoring the previously normalized list — the `publish()` call released the GIL, letting the event loop read the un-normalized dict and iterate over string keys instead of spool dicts. Fixed by normalizing `vt_tray` dict→list in the MQTT data before assignment, and moving preserved field restoration before the probe. Added defensive normalization in `printer_state_to_dict` as a belt-and-suspenders guard.
 
 

+ 42 - 1
backend/app/core/database.py

@@ -1,3 +1,6 @@
+import asyncio
+import logging
+
 from sqlalchemy import event
 from sqlalchemy import event
 from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
 from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
@@ -6,13 +9,15 @@ from sqlalchemy.orm import DeclarativeBase
 from backend.app.core.config import settings
 from backend.app.core.config import settings
 from backend.app.core.db_dialect import is_sqlite
 from backend.app.core.db_dialect import is_sqlite
 
 
+logger = logging.getLogger(__name__)
+
 
 
 def _set_sqlite_pragmas(dbapi_conn, connection_record):
 def _set_sqlite_pragmas(dbapi_conn, connection_record):
     """Set SQLite pragmas on each new connection for concurrency and performance."""
     """Set SQLite pragmas on each new connection for concurrency and performance."""
     cursor = dbapi_conn.cursor()
     cursor = dbapi_conn.cursor()
     # WAL mode allows concurrent readers + one writer (vs default DELETE mode which locks entirely)
     # WAL mode allows concurrent readers + one writer (vs default DELETE mode which locks entirely)
     cursor.execute("PRAGMA journal_mode = WAL")
     cursor.execute("PRAGMA journal_mode = WAL")
-    # Wait up to 5 seconds when the database is locked instead of failing immediately
+    # Wait up to 15 seconds when the database is locked instead of failing immediately
     cursor.execute("PRAGMA busy_timeout = 15000")
     cursor.execute("PRAGMA busy_timeout = 15000")
     cursor.execute("PRAGMA synchronous = NORMAL")
     cursor.execute("PRAGMA synchronous = NORMAL")
     cursor.close()
     cursor.close()
@@ -74,6 +79,42 @@ async_session = async_sessionmaker(
 )
 )
 
 
 
 
+async def run_with_retry(fn, *, max_attempts: int = 3, label: str = ""):
+    """Run an async DB operation with retry for SQLite 'database is locked' errors.
+
+    ``fn`` is an async callable that receives an ``AsyncSession`` and performs
+    the full query-mutate-commit cycle.  On each retry a fresh session is used
+    so there are no stale-object / expired-attribute issues after rollback.
+
+    On PostgreSQL this calls ``fn`` once with no retry (Postgres uses row-level
+    locking and doesn't suffer from single-writer contention).
+    """
+    if not is_sqlite():
+        async with async_session() as db:
+            return await fn(db)
+
+    last_exc: OperationalError | None = None
+    for attempt in range(1, max_attempts + 1):
+        try:
+            async with async_session() as db:
+                return await fn(db)
+        except OperationalError as exc:
+            last_exc = exc
+            if "database is locked" not in str(exc) or attempt == max_attempts:
+                raise
+            delay = 0.5 * attempt  # 0.5s, 1.0s
+            logger.warning(
+                "SQLite locked%s (attempt %d/%d), retrying in %.1fs: %s",
+                f" ({label})" if label else "",
+                attempt,
+                max_attempts,
+                delay,
+                exc,
+            )
+            await asyncio.sleep(delay)
+    raise last_exc  # unreachable, but keeps type checkers happy
+
+
 async def close_all_connections():
 async def close_all_connections():
     """Close all database connections for backup/restore operations."""
     """Close all database connections for backup/restore operations."""
     global engine
     global engine

+ 114 - 90
backend/app/main.py

@@ -2466,10 +2466,16 @@ async def on_print_complete(printer_id: int, data: dict):
 
 
     # Update queue item status early — must run before the archive_id early-return
     # Update queue item status early — must run before the archive_id early-return
     # so queue items don't get stuck in "printing" when archive lookup fails.
     # so queue items don't get stuck in "printing" when archive lookup fails.
+    # Uses run_with_retry to handle SQLite "database is locked" errors (#897).
+    queue_item_id = None
+    queue_status = None
+    queue_auto_off = False
     try:
     try:
-        async with async_session() as db:
-            from backend.app.models.print_queue import PrintQueueItem
+        from backend.app.core.database import run_with_retry
+        from backend.app.models.print_queue import PrintQueueItem
 
 
+        async def _update_queue_status(db):
+            nonlocal queue_item_id, queue_status, queue_auto_off
             result = await db.execute(
             result = await db.execute(
                 select(PrintQueueItem)
                 select(PrintQueueItem)
                 .where(PrintQueueItem.printer_id == printer_id)
                 .where(PrintQueueItem.printer_id == printer_id)
@@ -2482,35 +2488,43 @@ async def on_print_complete(printer_id: int, data: dict):
                     printer_id,
                     printer_id,
                     [(i.id, i.archive_id, i.library_file_id) for i in printing_items],
                     [(i.id, i.archive_id, i.library_file_id) for i in printing_items],
                 )
                 )
-            queue_item = printing_items[0] if printing_items else None
-            if queue_item:
+            item = printing_items[0] if printing_items else None
+            if item:
                 queue_status = data.get("status", "completed")
                 queue_status = data.get("status", "completed")
                 # MQTT sends "aborted" for cancelled prints; normalise to
                 # MQTT sends "aborted" for cancelled prints; normalise to
                 # "cancelled" so it matches the queue schema Literal.
                 # "cancelled" so it matches the queue schema Literal.
                 if queue_status == "aborted":
                 if queue_status == "aborted":
                     queue_status = "cancelled"
                     queue_status = "cancelled"
-                queue_item.status = queue_status
-                queue_item.completed_at = datetime.now(timezone.utc)
+                item.status = queue_status
+                item.completed_at = datetime.now(timezone.utc)
                 await db.commit()
                 await db.commit()
-                logger.info("Updated queue item %s status to %s", queue_item.id, queue_status)
+                queue_item_id = item.id
+                queue_auto_off = item.auto_off_after
+                logger.info("Updated queue item %s status to %s", item.id, queue_status)
 
 
-                # MQTT relay - publish queue job completed
-                try:
-                    printer_info = printer_manager.get_printer(printer_id)
-                    await mqtt_relay.on_queue_job_completed(
-                        job_id=queue_item.id,
-                        filename=filename or subtask_name,
-                        printer_id=printer_id,
-                        printer_name=printer_info.name if printer_info else "Unknown",
-                        status=queue_status,
-                    )
-                except Exception:
-                    pass  # Don't fail if MQTT fails
+        await run_with_retry(_update_queue_status, label="queue status update")
 
 
-                # Check if queue is now empty and send notification
-                try:
-                    from sqlalchemy import func as sa_func
+        # Post-commit side effects (notifications, MQTT relay, auto-off) use
+        # their own sessions and have their own error handling — no retry needed.
+        if queue_item_id is not None:
+            # MQTT relay - publish queue job completed
+            try:
+                printer_info = printer_manager.get_printer(printer_id)
+                await mqtt_relay.on_queue_job_completed(
+                    job_id=queue_item_id,
+                    filename=filename or subtask_name,
+                    printer_id=printer_id,
+                    printer_name=printer_info.name if printer_info else "Unknown",
+                    status=queue_status,
+                )
+            except Exception:
+                pass  # Don't fail if MQTT fails
 
 
+            # Check if queue is now empty and send notification
+            try:
+                from sqlalchemy import func as sa_func
+
+                async with async_session() as db:
                     count_result = await db.execute(
                     count_result = await db.execute(
                         select(sa_func.count(PrintQueueItem.id)).where(PrintQueueItem.status == "pending")
                         select(sa_func.count(PrintQueueItem.id)).where(PrintQueueItem.status == "pending")
                     )
                     )
@@ -2530,31 +2544,32 @@ async def on_print_complete(printer_id: int, data: dict):
                             completed_count=completed_count,
                             completed_count=completed_count,
                             db=db,
                             db=db,
                         )
                         )
-                except Exception:
-                    pass  # Don't fail if notification fails
+            except Exception:
+                pass  # Don't fail if notification fails
 
 
-                # Handle auto_off_after - power off printer if requested (after cooldown)
-                if queue_item.auto_off_after:
+            # Handle auto_off_after - power off printer if requested (after cooldown)
+            if queue_auto_off:
+                async with async_session() as db:
                     result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
                     result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
                     plug = result.scalar_one_or_none()
                     plug = result.scalar_one_or_none()
-                    if plug and plug.enabled:
-                        logger.info("Auto-off requested for printer %s, waiting for cooldown...", printer_id)
-
-                        async def cooldown_and_poweroff(pid: int, plug_id: int):
-                            # Wait for nozzle to cool down
-                            await printer_manager.wait_for_cooldown(pid, target_temp=50.0, timeout=600)
-                            # Re-fetch plug in new session
-                            async with async_session() as new_db:
-                                result = await new_db.execute(select(SmartPlug).where(SmartPlug.id == plug_id))
-                                p = result.scalar_one_or_none()
-                                if p and p.enabled:
-                                    success = await tasmota_service.turn_off(p)
-                                    if success:
-                                        logger.info("Powered off printer %s via smart plug '%s'", pid, p.name)
-                                    else:
-                                        logger.warning("Failed to power off printer %s via smart plug", pid)
+                if plug and plug.enabled:
+                    logger.info("Auto-off requested for printer %s, waiting for cooldown...", printer_id)
+
+                    async def cooldown_and_poweroff(pid: int, plug_id: int):
+                        # Wait for nozzle to cool down
+                        await printer_manager.wait_for_cooldown(pid, target_temp=50.0, timeout=600)
+                        # Re-fetch plug in new session
+                        async with async_session() as new_db:
+                            result = await new_db.execute(select(SmartPlug).where(SmartPlug.id == plug_id))
+                            p = result.scalar_one_or_none()
+                            if p and p.enabled:
+                                success = await tasmota_service.turn_off(p)
+                                if success:
+                                    logger.info("Powered off printer %s via smart plug '%s'", pid, p.name)
+                                else:
+                                    logger.warning("Failed to power off printer %s via smart plug", pid)
 
 
-                        asyncio.create_task(cooldown_and_poweroff(printer_id, plug.id))
+                    asyncio.create_task(cooldown_and_poweroff(printer_id, plug.id))
     except Exception as e:
     except Exception as e:
         logging.getLogger(__name__).warning(f"Queue item update failed: {e}")
         logging.getLogger(__name__).warning(f"Queue item update failed: {e}")
 
 
@@ -3451,61 +3466,70 @@ async def track_printer_runtime():
         try:
         try:
             from backend.app.models.printer import Printer
             from backend.app.models.printer import Printer
 
 
+            # Fetch printer IDs in a short-lived read-only session
             async with async_session() as db:
             async with async_session() as db:
-                # Get all active printers
-                result = await db.execute(select(Printer).where(Printer.is_active.is_(True)))
-                printers = result.scalars().all()
-
-                now = datetime.now(timezone.utc)
-                updated_count = 0
+                result = await db.execute(
+                    select(Printer.id, Printer.name, Printer.runtime_seconds, Printer.last_runtime_update).where(
+                        Printer.is_active.is_(True)
+                    )
+                )
+                printer_rows = result.all()
+
+            now = datetime.now(timezone.utc)
+            updated_count = 0
+
+            # Update each printer in its own short session to minimise write-lock
+            # hold time and avoid blocking critical commits like queue status
+            # updates (#897).
+            for pid, pname, runtime_secs, last_update in printer_rows:
+                state = printer_manager.get_status(pid)
+                if not state:
+                    logger.debug("[%s] Runtime tracking: no state available", pname)
+                    continue
+                if not state.connected:
+                    logger.debug("[%s] Runtime tracking: not connected", pname)
+                    continue
 
 
                 needs_commit = False
                 needs_commit = False
-
-                for printer in printers:
-                    # Get current state from printer manager
-                    state = printer_manager.get_status(printer.id)
-                    if not state:
-                        logger.debug("[%s] Runtime tracking: no state available", printer.name)
-                        continue
-                    if not state.connected:
-                        logger.debug("[%s] Runtime tracking: not connected", printer.name)
-                        continue
-
-                    # Check if printer is in an active state (RUNNING or PAUSE)
-                    if state.state in ("RUNNING", "PAUSE"):
-                        # Calculate time since last update
-                        if printer.last_runtime_update:
-                            last_update = printer.last_runtime_update
-                            if last_update.tzinfo is None:
-                                last_update = last_update.replace(tzinfo=timezone.utc)
-                            elapsed = (now - last_update).total_seconds()
-                            if elapsed > 0:
-                                printer.runtime_seconds += int(elapsed)
-                                updated_count += 1
-                                needs_commit = True
-                                logger.debug(
-                                    f"[{printer.name}] Runtime tracking: added {int(elapsed)}s, "
-                                    f"total={printer.runtime_seconds}s ({printer.runtime_seconds / 3600:.2f}h)"
-                                )
-                        else:
-                            # First time seeing printer active - need to commit to save timestamp
+                new_runtime = runtime_secs
+                new_last_update = last_update
+
+                if state.state in ("RUNNING", "PAUSE"):
+                    if last_update:
+                        lu = last_update if last_update.tzinfo else last_update.replace(tzinfo=timezone.utc)
+                        elapsed = (now - lu).total_seconds()
+                        if elapsed > 0:
+                            new_runtime = runtime_secs + int(elapsed)
+                            updated_count += 1
                             needs_commit = True
                             needs_commit = True
-                            logger.debug("[%s] Runtime tracking: first active detection", printer.name)
-
-                        printer.last_runtime_update = now
-                    else:
-                        # Printer is idle/offline - clear last_runtime_update
-                        if printer.last_runtime_update is not None:
                             logger.debug(
                             logger.debug(
-                                f"[{printer.name}] Runtime tracking: state={state.state}, clearing last_runtime_update"
+                                f"[{pname}] Runtime tracking: added {int(elapsed)}s, "
+                                f"total={new_runtime}s ({new_runtime / 3600:.2f}h)"
                             )
                             )
-                            printer.last_runtime_update = None
-                            needs_commit = True
+                    else:
+                        needs_commit = True
+                        logger.debug("[%s] Runtime tracking: first active detection", pname)
+                    new_last_update = now
+                else:
+                    if last_update is not None:
+                        logger.debug(f"[{pname}] Runtime tracking: state={state.state}, clearing last_runtime_update")
+                        new_last_update = None
+                        needs_commit = True
 
 
                 if needs_commit:
                 if needs_commit:
-                    await db.commit()
-                    if updated_count > 0:
-                        logger.debug("Updated runtime for %s printer(s)", updated_count)
+                    try:
+                        async with async_session() as db:
+                            result = await db.execute(select(Printer).where(Printer.id == pid))
+                            printer = result.scalar_one_or_none()
+                            if printer:
+                                printer.runtime_seconds = new_runtime
+                                printer.last_runtime_update = new_last_update
+                                await db.commit()
+                    except Exception as e:
+                        logger.warning("[%s] Runtime tracking commit failed: %s", pname, e)
+
+            if updated_count > 0:
+                logger.debug("Updated runtime for %s printer(s)", updated_count)
 
 
         except asyncio.CancelledError:
         except asyncio.CancelledError:
             logger.info("Runtime tracking cancelled")
             logger.info("Runtime tracking cancelled")

+ 2 - 0
backend/tests/integration/test_print_queue_api.py

@@ -1293,6 +1293,7 @@ class TestAbortedStatusNormalisation:
 
 
         with (
         with (
             patch("backend.app.main.async_session", return_value=mock_session),
             patch("backend.app.main.async_session", return_value=mock_session),
+            patch("backend.app.core.database.async_session", return_value=mock_session),
             patch("backend.app.main.ws_manager") as mock_ws,
             patch("backend.app.main.ws_manager") as mock_ws,
             patch("backend.app.main.mqtt_relay") as mock_relay,
             patch("backend.app.main.mqtt_relay") as mock_relay,
             patch("backend.app.main.notification_service") as mock_notif,
             patch("backend.app.main.notification_service") as mock_notif,
@@ -1388,6 +1389,7 @@ class TestAbortedStatusNormalisation:
 
 
         with (
         with (
             patch("backend.app.main.async_session", return_value=mock_session),
             patch("backend.app.main.async_session", return_value=mock_session),
+            patch("backend.app.core.database.async_session", return_value=mock_session),
             patch("backend.app.main.ws_manager") as mock_ws,
             patch("backend.app.main.ws_manager") as mock_ws,
             patch("backend.app.main.mqtt_relay") as mock_relay,
             patch("backend.app.main.mqtt_relay") as mock_relay,
             patch("backend.app.main.notification_service") as mock_notif,
             patch("backend.app.main.notification_service") as mock_notif,

+ 198 - 0
backend/tests/unit/test_run_with_retry.py

@@ -0,0 +1,198 @@
+"""Tests for database.run_with_retry — SQLite lock retry logic (#897)."""
+
+from __future__ import annotations
+
+from unittest.mock import AsyncMock, patch
+
+import pytest
+from sqlalchemy.exc import OperationalError
+
+
+@pytest.fixture(autouse=True)
+def _force_sqlite():
+    """Make is_sqlite() return True for all tests in this module."""
+    with patch("backend.app.core.database.is_sqlite", return_value=True):
+        yield
+
+
+def _make_locked_error() -> OperationalError:
+    """Create a realistic 'database is locked' OperationalError."""
+    return OperationalError(
+        statement="UPDATE print_queue SET status=?",
+        params=("completed",),
+        orig=Exception("database is locked"),
+    )
+
+
+def _make_other_error() -> OperationalError:
+    """Create a non-lock OperationalError."""
+    return OperationalError(
+        statement="SELECT 1",
+        params=(),
+        orig=Exception("no such table: foo"),
+    )
+
+
+@pytest.mark.asyncio
+async def test_succeeds_on_first_attempt():
+    """Happy path — fn succeeds immediately."""
+    from backend.app.core.database import run_with_retry
+
+    mock_fn = AsyncMock(return_value="ok")
+
+    with patch("backend.app.core.database.async_session") as mock_session_factory:
+        mock_db = AsyncMock()
+        mock_session_factory.return_value.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_factory.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        result = await run_with_retry(mock_fn, label="test")
+
+    assert result == "ok"
+    mock_fn.assert_awaited_once_with(mock_db)
+
+
+@pytest.mark.asyncio
+async def test_retries_on_sqlite_locked():
+    """fn fails with 'database is locked' then succeeds on retry."""
+    from backend.app.core.database import run_with_retry
+
+    call_count = 0
+
+    async def flaky_fn(db):
+        nonlocal call_count
+        call_count += 1
+        if call_count == 1:
+            raise _make_locked_error()
+        return "recovered"
+
+    with (
+        patch("backend.app.core.database.async_session") as mock_session_factory,
+        patch("backend.app.core.database.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
+    ):
+        mock_db = AsyncMock()
+        mock_session_factory.return_value.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_factory.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        result = await run_with_retry(flaky_fn, label="test")
+
+    assert result == "recovered"
+    assert call_count == 2
+    mock_sleep.assert_awaited_once_with(0.5)  # first retry: 0.5s delay
+
+
+@pytest.mark.asyncio
+async def test_raises_after_max_attempts():
+    """fn fails with 'database is locked' on all attempts — raises."""
+    from backend.app.core.database import run_with_retry
+
+    async def always_locked(db):
+        raise _make_locked_error()
+
+    with (
+        patch("backend.app.core.database.async_session") as mock_session_factory,
+        patch("backend.app.core.database.asyncio.sleep", new_callable=AsyncMock),
+    ):
+        mock_db = AsyncMock()
+        mock_session_factory.return_value.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_factory.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        with pytest.raises(OperationalError, match="database is locked"):
+            await run_with_retry(always_locked, max_attempts=3, label="test")
+
+
+@pytest.mark.asyncio
+async def test_non_lock_error_not_retried():
+    """Non-lock OperationalErrors are raised immediately, not retried."""
+    from backend.app.core.database import run_with_retry
+
+    call_count = 0
+
+    async def bad_fn(db):
+        nonlocal call_count
+        call_count += 1
+        raise _make_other_error()
+
+    with (
+        patch("backend.app.core.database.async_session") as mock_session_factory,
+        patch("backend.app.core.database.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
+    ):
+        mock_db = AsyncMock()
+        mock_session_factory.return_value.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_factory.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        with pytest.raises(OperationalError, match="no such table"):
+            await run_with_retry(bad_fn, label="test")
+
+    assert call_count == 1
+    mock_sleep.assert_not_awaited()
+
+
+@pytest.mark.asyncio
+async def test_backoff_increases():
+    """Retry delays increase: 0.5s, 1.0s, 1.5s."""
+    from backend.app.core.database import run_with_retry
+
+    call_count = 0
+
+    async def recovers_on_third(db):
+        nonlocal call_count
+        call_count += 1
+        if call_count < 3:
+            raise _make_locked_error()
+        return "ok"
+
+    with (
+        patch("backend.app.core.database.async_session") as mock_session_factory,
+        patch("backend.app.core.database.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
+    ):
+        mock_db = AsyncMock()
+        mock_session_factory.return_value.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_factory.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        result = await run_with_retry(recovers_on_third, max_attempts=3, label="test")
+
+    assert result == "ok"
+    assert call_count == 3
+    assert mock_sleep.await_args_list[0].args == (0.5,)
+    assert mock_sleep.await_args_list[1].args == (1.0,)
+
+
+@pytest.mark.asyncio
+async def test_postgres_no_retry():
+    """On PostgreSQL, fn is called once with no retry logic."""
+    from backend.app.core.database import run_with_retry
+
+    mock_fn = AsyncMock(return_value="pg_ok")
+
+    with (
+        patch("backend.app.core.database.is_sqlite", return_value=False),
+        patch("backend.app.core.database.async_session") as mock_session_factory,
+    ):
+        mock_db = AsyncMock()
+        mock_session_factory.return_value.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_factory.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        result = await run_with_retry(mock_fn, label="test")
+
+    assert result == "pg_ok"
+    mock_fn.assert_awaited_once_with(mock_db)
+
+
+@pytest.mark.asyncio
+async def test_postgres_error_not_retried():
+    """On PostgreSQL, OperationalErrors are raised immediately."""
+    from backend.app.core.database import run_with_retry
+
+    async def bad_fn(db):
+        raise _make_locked_error()
+
+    with (
+        patch("backend.app.core.database.is_sqlite", return_value=False),
+        patch("backend.app.core.database.async_session") as mock_session_factory,
+    ):
+        mock_db = AsyncMock()
+        mock_session_factory.return_value.__aenter__ = AsyncMock(return_value=mock_db)
+        mock_session_factory.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        with pytest.raises(OperationalError):
+            await run_with_retry(bad_fn, label="test")