Browse Source

fix: cancel-safe get_db + drop sqlalchemy.pool cancellation noise

  @Carter3DP's support package showed bambuddy.log filling with two
  distinct cascades on long uploads:

    ERROR sqlalchemy.pool   Exception terminating connection ...
                            CancelledError: Cancelled via cancel scope
                            ... by starlette.middleware.base
                            .BaseHTTPMiddleware.__call__.call_next
    ERROR sqlalchemy.pool   The garbage collector is trying to clean up
                            non-checked-in connection ... will be
                            terminated.
    WARN  backend.app.main  Runtime tracking commit failed:
                            (sqlite3.OperationalError) database is locked

  Single root cause. Starlette's BaseHTTPMiddleware (used under the hood
  by every @app.middleware("http") decorator) cancels the inner task
  scope when a client disconnects mid-request — common on long
  multipart uploads where the client times out before the server's
  response. Pre-fix get_db only caught Exception, but CancelledError
  is BaseException, so cancellation skipped the rollback path entirely.
  The SQLite write lock stayed held until GC reclaimed the connection
  ages later, blocking every other writer in the meantime. On Postgres
  the leak shape is identical; the symptom would be "QueuePool limit
  ... overflow" instead of "database is locked".

  (1) get_db now catches BaseException so CancelledError triggers
      rollback. Both rollback() and close() are wrapped in
      asyncio.shield so the cleanup completes even when the await
      itself is being cancelled by the same cancel scope. SQLite write
      lock is released promptly; connection returns to the pool instead
      of leaking until GC.

  (2) CancelledPoolNoiseFilter (new filter on sqlalchemy.pool) drops
      the residual records that pre-existing pools still emit during
      their own cleanup. Two patterns suppressed:
        - "Exception terminating connection ..." with a CancelledError
          anywhere in the exc_info chain (walks __cause__/__context__
          with a seen-set guard against pathological cycles)
        - "The garbage collector is trying to clean up non-checked-in
          connection ..." (always symptomatic of cancellation; never
          independently actionable)
      Real pool problems — broken connections, OSError on terminate,
      pool exhaustion — keep flowing because they carry a different
      exception chain or a different message prefix.

  13 regression tests across test_get_db_cancel_safety.py (commit on
  clean exit, rollback on regular Exception, rollback on CancelledError,
  close runs even if rollback raises, close failure on clean exit
  doesn't propagate, rollback + close both go through asyncio.shield)
  and test_cancelled_pool_filter.py (drops cancellation-driven
  terminate, drops GC-cleanup, keeps real OSError terminate, keeps
  terminate without exc_info, keeps unrelated pool messages, drops
  chained-cause CancelledError, defensive guard against self-referential
  cause chains).

  Applies to SQLite and PostgreSQL — get_db is dialect-agnostic and
  the filtered messages come from base sqlalchemy.pool not from any
  specific dialect.
maziggy 1 month ago
parent
commit
9884018497

File diff suppressed because it is too large
+ 0 - 0
CHANGELOG.md


+ 17 - 3
backend/app/core/database.py

@@ -141,11 +141,25 @@ async def get_db() -> AsyncSession:
         try:
             yield session
             await session.commit()
-        except Exception:
-            await session.rollback()
+        except BaseException:
+            # Catch BaseException (not just Exception) so CancelledError —
+            # raised when Starlette's BaseHTTPMiddleware cancels the inner
+            # task scope on client disconnect — also triggers rollback.
+            # `asyncio.shield` keeps the rollback running to completion
+            # even when the await itself gets cancelled, so the SQLite
+            # write lock is released promptly instead of being held until
+            # the connection is GC'd ages later (which was producing the
+            # "database is locked" cascade in #1112's support package).
+            try:
+                await asyncio.shield(session.rollback())
+            except BaseException:  # noqa: BLE001 — rollback failure must not mask the original
+                pass
             raise
         finally:
-            await session.close()
+            try:
+                await asyncio.shield(session.close())
+            except BaseException:  # noqa: BLE001 — close failure must not mask the original
+                pass
 
 
 async def init_db():

+ 67 - 4
backend/app/core/logging_filters.py

@@ -1,13 +1,16 @@
 """Logging filters for the Bambuddy log pipeline.
 
-Currently houses a single filter that keeps only state-changing HTTP methods
-in the file-side uvicorn access log. See ``WriteRequestsOnlyFilter`` for the
-why; this lives in its own module so the test suite can import it without
-pulling in ``backend.app.main``'s entire startup graph.
+Holds two filters: ``WriteRequestsOnlyFilter`` keeps the file-side
+uvicorn access log focused on state-changing HTTP methods, and
+``CancelledPoolNoiseFilter`` drops SQLAlchemy connection-pool log noise
+caused by Starlette's ``BaseHTTPMiddleware`` cancellation propagation
+(see the filter's docstring for details). Both live here so tests can
+import them without pulling in ``backend.app.main``'s startup graph.
 """
 
 from __future__ import annotations
 
+import asyncio
 import logging
 
 
@@ -44,3 +47,63 @@ class WriteRequestsOnlyFilter(logging.Filter):
     def filter(self, record: logging.LogRecord) -> bool:  # noqa: A003 — stdlib API name
         message = record.getMessage()
         return any(token in message for token in self._WRITE_VERB_TOKENS)
+
+
+class CancelledPoolNoiseFilter(logging.Filter):
+    """Drop SQLAlchemy connection-pool log records driven by request cancellation.
+
+    Starlette's ``BaseHTTPMiddleware`` (used under the hood by FastAPI's
+    ``@app.middleware("http")`` decorator) cancels the inner task scope when a
+    client disconnects mid-request. The cancellation propagates into
+    SQLAlchemy's connection-pool cleanup and surfaces as two distinct ERROR
+    records — both expected on disconnect, neither actionable for the user:
+
+    1. ``Exception terminating connection ... CancelledError`` — fires every
+       time ``do_terminate`` is interrupted by the same cancel scope that's
+       unwinding the request. The ``CancelledError`` traceback always
+       attributes the cancel to ``BaseHTTPMiddleware.call_next``.
+
+    2. ``The garbage collector is trying to clean up non-checked-in
+       connection`` — fires later when the GC reclaims the session that
+       couldn't return its connection to the pool because of (1). It's
+       symptomatic of the cancellation, not a separate bug.
+
+    These pile up under heavy upload load (long multipart uploads where the
+    client times out before the server's response). Real connection-pool
+    issues — pool exhaustion, broken connections from network hiccups, etc.
+    — surface through DIFFERENT messages and a non-cancellation
+    ``exc_info`` chain, so they keep flowing through this filter unchanged.
+
+    Attach to ``logging.getLogger("sqlalchemy.pool")`` (and only there).
+    """
+
+    _GC_CLEANUP_PREFIX = "The garbage collector is trying to clean up non-checked-in connection"
+    _TERMINATE_PREFIX = "Exception terminating connection"
+
+    @staticmethod
+    def _has_cancelled_in_chain(exc: BaseException | None) -> bool:
+        """True if `exc` is `CancelledError` or has one in its cause chain."""
+        seen: set[int] = set()
+        cur: BaseException | None = exc
+        while cur is not None and id(cur) not in seen:
+            seen.add(id(cur))
+            if isinstance(cur, asyncio.CancelledError):
+                return True
+            cur = cur.__cause__ or cur.__context__
+        return False
+
+    def filter(self, record: logging.LogRecord) -> bool:  # noqa: A003 — stdlib API name
+        message = record.getMessage()
+        # GC-cleanup records have no exc_info — match by prefix only. Always
+        # symptomatic of the cancellation cascade, never independently useful.
+        if message.startswith(self._GC_CLEANUP_PREFIX):
+            return False
+        # Terminate-connection records carry a traceback; only drop those
+        # that are cancellation-driven. A real terminate failure (broken
+        # connection, network hiccup) keeps a non-CancelledError exc_info
+        # chain and surfaces normally.
+        if message.startswith(self._TERMINATE_PREFIX) and record.exc_info:
+            exc = record.exc_info[1]
+            if self._has_cancelled_in_chain(exc):
+                return False
+        return True

+ 11 - 1
backend/app/main.py

@@ -288,7 +288,10 @@ if app_settings.log_to_file:
     # for exactly this reason. Filtered to write methods only
     # (POST/PUT/PATCH/DELETE) so the high-volume status-poll GETs from the
     # frontend don't churn the rotation window faster than it's useful.
-    from backend.app.core.logging_filters import WriteRequestsOnlyFilter
+    from backend.app.core.logging_filters import (
+        CancelledPoolNoiseFilter,
+        WriteRequestsOnlyFilter,
+    )
 
     uvicorn_access_logger = logging.getLogger("uvicorn.access")
     uvicorn_access_logger.addHandler(file_handler)
@@ -299,6 +302,13 @@ if app_settings.log_to_file:
     # ID column as the application logs they correlate with.
     uvicorn_access_logger.addFilter(TraceIDFilter())
 
+    # Drop SQLAlchemy connection-pool log noise that's caused by Starlette's
+    # BaseHTTPMiddleware cancelling the inner task scope on client
+    # disconnect (#1112). The cancel-safe `get_db` already prevents the
+    # underlying transaction leak; this filter only suppresses the residual
+    # log records that pre-existing pools still emit during their cleanup.
+    logging.getLogger("sqlalchemy.pool").addFilter(CancelledPoolNoiseFilter())
+
 # Reduce noise from third-party libraries in production
 if not app_settings.debug:
     logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)

+ 76 - 0
backend/tests/unit/test_cancelled_pool_filter.py

@@ -0,0 +1,76 @@
+"""Tests for the SQLAlchemy connection-pool cancellation noise filter (#1112)."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+
+from backend.app.core.logging_filters import CancelledPoolNoiseFilter
+
+
+def _make_record(message: str, *, exc: BaseException | None = None) -> logging.LogRecord:
+    """Build a `LogRecord` carrying `message` (no positional args) and
+    optionally an `exc_info` tuple holding `exc`."""
+    record = logging.LogRecord(
+        name="sqlalchemy.pool.impl.AsyncAdaptedQueuePool",
+        level=logging.ERROR,
+        pathname=__file__,
+        lineno=0,
+        msg=message,
+        args=(),
+        exc_info=(type(exc), exc, exc.__traceback__) if exc is not None else None,
+    )
+    return record
+
+
+class TestCancelledPoolNoiseFilter:
+    """Drops the cancellation cascade, keeps real pool errors visible."""
+
+    def test_drops_terminate_with_cancelled_exc(self):
+        cancel = asyncio.CancelledError("Cancelled via cancel scope")
+        record = _make_record("Exception terminating connection <ABC>", exc=cancel)
+        assert CancelledPoolNoiseFilter().filter(record) is False
+
+    def test_drops_gc_cleanup_record(self):
+        # GC cleanup messages have no exc_info attached — match by prefix.
+        record = _make_record("The garbage collector is trying to clean up non-checked-in connection <ABC>")
+        assert CancelledPoolNoiseFilter().filter(record) is False
+
+    def test_keeps_terminate_with_real_oserror(self):
+        """A genuine connection-terminate failure (network hiccup, broken
+        socket) carries a non-cancellation exc_info chain. That's a real
+        problem the user should see — must NOT be dropped."""
+        oserr = OSError("broken pipe")
+        record = _make_record("Exception terminating connection <ABC>", exc=oserr)
+        assert CancelledPoolNoiseFilter().filter(record) is True
+
+    def test_keeps_terminate_without_exc_info(self):
+        """If for any reason `exc_info` is missing on a terminate record,
+        keep it — only filter when we have positive evidence it's the
+        cancellation cascade."""
+        record = _make_record("Exception terminating connection <ABC>")
+        assert CancelledPoolNoiseFilter().filter(record) is True
+
+    def test_keeps_unrelated_pool_message(self):
+        """Other pool messages (pool size warnings, etc.) keep flowing."""
+        record = _make_record("Pool size has been exceeded; will spawn overflow")
+        assert CancelledPoolNoiseFilter().filter(record) is True
+
+    def test_drops_when_cancelled_is_in_cause_chain(self):
+        """Real-world traceback: SQLAlchemy wraps the CancelledError in a
+        chained exception. The filter walks `__cause__`/`__context__` so a
+        chained CancelledError still counts."""
+        cancel = asyncio.CancelledError()
+        wrapper = RuntimeError("terminate failed")
+        wrapper.__cause__ = cancel
+        record = _make_record("Exception terminating connection <ABC>", exc=wrapper)
+        assert CancelledPoolNoiseFilter().filter(record) is False
+
+    def test_handles_self_referential_cause_chain(self):
+        """Defensive: malformed exception chains (rare but possible) must
+        not loop forever — the `seen` set guards against it."""
+        a = RuntimeError("a")
+        a.__cause__ = a  # pathological
+        record = _make_record("Exception terminating connection <ABC>", exc=a)
+        # Doesn't loop, doesn't raise, returns True (no CancelledError found).
+        assert CancelledPoolNoiseFilter().filter(record) is True

+ 163 - 0
backend/tests/unit/test_get_db_cancel_safety.py

@@ -0,0 +1,163 @@
+"""Tests for `get_db` cancel-safety (#1112).
+
+Starlette's BaseHTTPMiddleware cancels the inner task scope when a
+client disconnects mid-request. Pre-fix `get_db` only caught `Exception`
+(not `BaseException`), so `CancelledError` skipped the rollback path —
+the SQLite write lock stayed held until the connection was eventually
+GC'd, producing the "database is locked" cascade in @Carter3DP's
+support package on #1112.
+
+The fix:
+  1. Catch `BaseException` so `CancelledError` triggers rollback.
+  2. `asyncio.shield` rollback + close so the cleanup completes even
+     when the await is cancelled by the same cancel scope.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from unittest.mock import AsyncMock, patch
+
+import pytest
+
+from backend.app.core import database
+
+
+class _FakeSession:
+    """Minimal async-context-manager stand-in for `AsyncSession`.
+
+    Records which lifecycle methods were invoked so tests can assert on
+    the cleanup order without a real engine / DB file.
+    """
+
+    def __init__(self):
+        self.commit = AsyncMock(name="commit")
+        self.rollback = AsyncMock(name="rollback")
+        self.close = AsyncMock(name="close")
+
+    async def __aenter__(self):
+        return self
+
+    async def __aexit__(self, exc_type, exc, tb):
+        return False  # don't suppress
+
+
+@pytest.fixture
+def fake_session_factory(monkeypatch):
+    """Patch `database.async_session` to yield a fresh `_FakeSession`."""
+    session = _FakeSession()
+    monkeypatch.setattr(database, "async_session", lambda: session)
+    return session
+
+
+async def _consume_get_db(action):
+    """Drive `get_db` like FastAPI's dependency machinery does:
+    enter the async generator, run `action(session)`, then advance to
+    completion. Returns the entered session."""
+    gen = database.get_db()
+    session = await gen.__anext__()
+    try:
+        await action(session)
+    except StopAsyncIteration:
+        return session
+    # Advance to the end so the generator's finally runs.
+    try:
+        await gen.__anext__()
+    except StopAsyncIteration:
+        pass
+    return session
+
+
+class TestCancelSafety:
+    """Pin the cancel-safety contract end-to-end."""
+
+    @pytest.mark.asyncio
+    async def test_commit_on_clean_exit(self, fake_session_factory):
+        session = fake_session_factory
+
+        async def noop(_s):
+            pass
+
+        await _consume_get_db(noop)
+
+        session.commit.assert_awaited_once()
+        session.rollback.assert_not_awaited()
+        session.close.assert_awaited_once()
+
+    @pytest.mark.asyncio
+    async def test_rollback_on_regular_exception(self, fake_session_factory):
+        session = fake_session_factory
+
+        gen = database.get_db()
+        await gen.__anext__()
+        with pytest.raises(ValueError):
+            await gen.athrow(ValueError("route handler bug"))
+
+        session.commit.assert_not_awaited()
+        session.rollback.assert_awaited_once()
+        session.close.assert_awaited_once()
+
+    @pytest.mark.asyncio
+    async def test_rollback_on_cancelled_error(self, fake_session_factory):
+        """The actual #1112 fix: CancelledError must NOT skip the rollback.
+        Pre-fix `except Exception` caught nothing because CancelledError
+        is a BaseException, not an Exception."""
+        session = fake_session_factory
+
+        gen = database.get_db()
+        await gen.__anext__()
+        with pytest.raises(asyncio.CancelledError):
+            await gen.athrow(asyncio.CancelledError("client disconnected"))
+
+        session.commit.assert_not_awaited()
+        session.rollback.assert_awaited_once()
+        session.close.assert_awaited_once()
+
+    @pytest.mark.asyncio
+    async def test_close_runs_even_if_rollback_raises(self, fake_session_factory):
+        """A failing rollback (broken connection during cancellation) must
+        not prevent `close` from running — otherwise the pool would never
+        reclaim the connection."""
+        session = fake_session_factory
+        session.rollback.side_effect = OSError("broken pipe during rollback")
+
+        gen = database.get_db()
+        await gen.__anext__()
+        with pytest.raises(asyncio.CancelledError):
+            await gen.athrow(asyncio.CancelledError())
+
+        session.rollback.assert_awaited_once()
+        session.close.assert_awaited_once()
+
+    @pytest.mark.asyncio
+    async def test_close_failure_does_not_propagate(self, fake_session_factory):
+        """A failing close on the clean-exit path must not raise out of
+        `get_db` — the request already succeeded."""
+        session = fake_session_factory
+        session.close.side_effect = OSError("close failed")
+
+        async def noop(_s):
+            pass
+
+        # Must not raise.
+        await _consume_get_db(noop)
+
+        session.commit.assert_awaited_once()
+        session.close.assert_awaited_once()
+
+    @pytest.mark.asyncio
+    async def test_rollback_uses_shield(self, fake_session_factory):
+        """Cancellation arriving DURING rollback must not abort the
+        rollback — `asyncio.shield` keeps it running. Verify the call
+        path goes through `shield` so future refactors don't silently
+        drop the protection."""
+        # The fixture wires the fake session into `database.async_session`;
+        # we don't need the local handle here.
+        with patch.object(asyncio, "shield", wraps=asyncio.shield) as shield:
+            gen = database.get_db()
+            await gen.__anext__()
+            with pytest.raises(asyncio.CancelledError):
+                await gen.athrow(asyncio.CancelledError())
+
+        # rollback + close both shielded.
+        assert shield.call_count == 2

Some files were not shown because too many files changed in this diff