Просмотр исходного кода

fix(auth): cleanup orphan OIDC/MFA rows when user is deleted (#1285) (#1295)

fix(auth): cleanup orphan OIDC/MFA rows on user delete (#1285)

Three User-FK tables (user_oidc_links, user_totp, user_otp_codes)
declare ON DELETE CASCADE in their models, but SQLite ships with
PRAGMA foreign_keys=OFF (the project's existing pattern, mirrored
for APIKey in PR #1182). Without explicit DELETEs, deleting a user
on SQLite leaves orphan rows behind:
Sn0rrii 2 недель назад
Родитель
Сommit
4d8dbc8336

+ 6 - 1
backend/app/api/routes/mfa.py

@@ -1864,11 +1864,16 @@ async def list_oidc_links(
         select(UserOIDCLink).where(UserOIDCLink.user_id == current_user.id).options(selectinload(UserOIDCLink.provider))
         select(UserOIDCLink).where(UserOIDCLink.user_id == current_user.id).options(selectinload(UserOIDCLink.provider))
     )
     )
     links = result.scalars().all()
     links = result.scalars().all()
+    # Defensive null-check on link.provider: on PostgreSQL the FK cascade
+    # ensures provider exists, but SQLite ships with FK enforcement off, so
+    # a deleted provider could in theory leave the link briefly orphan until
+    # the next init_db() cleanup runs. Returning "<deleted>" instead of
+    # crashing keeps the endpoint usable in that edge case (#1285 follow-up).
     return [
     return [
         OIDCLinkResponse(
         OIDCLinkResponse(
             id=link.id,
             id=link.id,
             provider_id=link.provider_id,
             provider_id=link.provider_id,
-            provider_name=link.provider.name,
+            provider_name=link.provider.name if link.provider else "<deleted>",
             provider_email=link.provider_email,
             provider_email=link.provider_email,
             created_at=link.created_at.isoformat(),
             created_at=link.created_at.isoformat(),
         )
         )

+ 26 - 1
backend/app/api/routes/users.py

@@ -25,9 +25,14 @@ from backend.app.models.api_key import APIKey
 from backend.app.models.archive import PrintArchive
 from backend.app.models.archive import PrintArchive
 from backend.app.models.group import Group
 from backend.app.models.group import Group
 from backend.app.models.library import LibraryFile
 from backend.app.models.library import LibraryFile
+from backend.app.models.long_lived_token import LongLivedToken
+from backend.app.models.oidc_provider import UserOIDCLink
+from backend.app.models.print_batch import PrintBatch
 from backend.app.models.print_queue import PrintQueueItem
 from backend.app.models.print_queue import PrintQueueItem
 from backend.app.models.settings import Settings
 from backend.app.models.settings import Settings
 from backend.app.models.user import User
 from backend.app.models.user import User
+from backend.app.models.user_otp_code import UserOTPCode
+from backend.app.models.user_totp import UserTOTP
 from backend.app.schemas.auth import ChangePasswordRequest, GroupBrief, UserCreate, UserResponse, UserUpdate
 from backend.app.schemas.auth import ChangePasswordRequest, GroupBrief, UserCreate, UserResponse, UserUpdate
 from backend.app.services.email_service import (
 from backend.app.services.email_service import (
     create_welcome_email_from_template,
     create_welcome_email_from_template,
@@ -395,9 +400,12 @@ async def delete_user(
         await db.execute(delete(PrintArchive).where(PrintArchive.created_by_id == user_id))
         await db.execute(delete(PrintArchive).where(PrintArchive.created_by_id == user_id))
         await db.execute(delete(PrintQueueItem).where(PrintQueueItem.created_by_id == user_id))
         await db.execute(delete(PrintQueueItem).where(PrintQueueItem.created_by_id == user_id))
         await db.execute(delete(LibraryFile).where(LibraryFile.created_by_id == user_id))
         await db.execute(delete(LibraryFile).where(LibraryFile.created_by_id == user_id))
+        await db.execute(delete(PrintBatch).where(PrintBatch.created_by_id == user_id))
     else:
     else:
         # Explicitly set created_by_id to NULL for all items (ensures consistent behavior
         # Explicitly set created_by_id to NULL for all items (ensures consistent behavior
-        # across different database backends, including SQLite without foreign key support)
+        # across different database backends, including SQLite without foreign key support).
+        # PrintBatch carries the same created_by_id FK with ondelete=SET NULL — admin-deleted
+        # users would otherwise leave dangling created_by_id on SQLite (#1295 review nit).
         from sqlalchemy import update
         from sqlalchemy import update
 
 
         await db.execute(update(PrintArchive).where(PrintArchive.created_by_id == user_id).values(created_by_id=None))
         await db.execute(update(PrintArchive).where(PrintArchive.created_by_id == user_id).values(created_by_id=None))
@@ -405,6 +413,7 @@ async def delete_user(
             update(PrintQueueItem).where(PrintQueueItem.created_by_id == user_id).values(created_by_id=None)
             update(PrintQueueItem).where(PrintQueueItem.created_by_id == user_id).values(created_by_id=None)
         )
         )
         await db.execute(update(LibraryFile).where(LibraryFile.created_by_id == user_id).values(created_by_id=None))
         await db.execute(update(LibraryFile).where(LibraryFile.created_by_id == user_id).values(created_by_id=None))
+        await db.execute(update(PrintBatch).where(PrintBatch.created_by_id == user_id).values(created_by_id=None))
 
 
     # Drop API keys owned by this user. The model declares ON DELETE CASCADE
     # Drop API keys owned by this user. The model declares ON DELETE CASCADE
     # so Postgres handles this automatically, but SQLite ships with FK
     # so Postgres handles this automatically, but SQLite ships with FK
@@ -417,6 +426,22 @@ async def delete_user(
     # exactly the orphan-key state the CASCADE was meant to prevent).
     # exactly the orphan-key state the CASCADE was meant to prevent).
     await db.execute(delete(APIKey).where(APIKey.user_id == user_id))
     await db.execute(delete(APIKey).where(APIKey.user_id == user_id))
 
 
+    # Drop OIDC links, MFA state, and long-lived camera-stream tokens
+    # owned by this user. Same SQLite/FK pattern as APIKey above. Without
+    # these, deleting a user on SQLite leaves:
+    #   - UserOIDCLink: the OIDC callback finds the orphan link, fails to
+    #     resolve the (now missing) user, and falls through to
+    #     "account_inactive" instead of triggering auto_create (#1285).
+    #   - UserTOTP: MFA secrets persist in the DB after the owning user.
+    #   - UserOTPCode: pending email OTP codes linger.
+    #   - LongLivedToken: per-user camera-stream tokens whose secret_hash
+    #     is still valid — verify() would happily match them by lookup
+    #     prefix even though the user is gone.
+    await db.execute(delete(UserOIDCLink).where(UserOIDCLink.user_id == user_id))
+    await db.execute(delete(UserTOTP).where(UserTOTP.user_id == user_id))
+    await db.execute(delete(UserOTPCode).where(UserOTPCode.user_id == user_id))
+    await db.execute(delete(LongLivedToken).where(LongLivedToken.user_id == user_id))
+
     await db.delete(user)
     await db.delete(user)
     await db.commit()
     await db.commit()
 
 

+ 35 - 0
backend/app/core/database.py

@@ -2418,6 +2418,41 @@ async def run_migrations(conn):
             conn, "ALTER TABLE notification_providers ADD COLUMN on_stock_break_alert BOOLEAN DEFAULT false"
             conn, "ALTER TABLE notification_providers ADD COLUMN on_stock_break_alert BOOLEAN DEFAULT false"
         )
         )
 
 
+    # Migration: Heal orphan auth-related rows left behind by user-delete
+    # on SQLite. user_oidc_links, user_totp, user_otp_codes (introduced in
+    # PR #933) and long_lived_tokens (PR #1108) all declare ON DELETE
+    # CASCADE on user_id — both predate the explicit APIKey-cleanup
+    # pattern in PR #1182. PostgreSQL enforces the cascade, but SQLite
+    # ships with FK enforcement off, so rows pointing to a deleted user
+    # persisted — blocking SSO re-login (the OIDC callback finds the
+    # orphan link, fails to resolve the missing user, and falls through
+    # to "account_inactive" instead of triggering auto_create), leaking
+    # MFA secrets, and leaving camera-stream tokens whose secret_hash is
+    # still verify()-able by lookup_prefix. See issue #1285 (#1295 review
+    # extended the cleanup to long_lived_tokens). This migration is a
+    # no-op on PostgreSQL and idempotent on SQLite.
+    async with conn.begin_nested():
+        oidc_result = await conn.execute(
+            text("DELETE FROM user_oidc_links WHERE user_id NOT IN (SELECT id FROM users)")
+        )
+        totp_result = await conn.execute(text("DELETE FROM user_totp WHERE user_id NOT IN (SELECT id FROM users)"))
+        otp_result = await conn.execute(text("DELETE FROM user_otp_codes WHERE user_id NOT IN (SELECT id FROM users)"))
+        llt_result = await conn.execute(
+            text("DELETE FROM long_lived_tokens WHERE user_id NOT IN (SELECT id FROM users)")
+        )
+    oidc_n = oidc_result.rowcount or 0
+    totp_n = totp_result.rowcount or 0
+    otp_n = otp_result.rowcount or 0
+    llt_n = llt_result.rowcount or 0
+    if oidc_n or totp_n or otp_n or llt_n:
+        logger.info(
+            "Cleaned up orphan auth rows: %d OIDC links, %d TOTP, %d OTP codes, %d long-lived tokens",
+            oidc_n,
+            totp_n,
+            otp_n,
+            llt_n,
+        )
+
 
 
 async def seed_notification_templates():
 async def seed_notification_templates():
     """Seed default notification templates if they don't exist."""
     """Seed default notification templates if they don't exist."""

+ 26 - 2
backend/tests/integration/test_auth_api.py

@@ -424,8 +424,23 @@ class TestUsersAPI:
 
 
     @pytest.mark.asyncio
     @pytest.mark.asyncio
     @pytest.mark.integration
     @pytest.mark.integration
-    async def test_delete_user(self, async_client: AsyncClient, auth_token: str):
-        """Verify admin can delete a user."""
+    async def test_delete_user(self, async_client: AsyncClient, auth_token: str, db_session):
+        """Verify admin can delete a user and that all auth-table side effects cascade.
+
+        The auth-cleanup side effects matter on SQLite (FK enforcement off by default):
+        without explicit DELETEs in the endpoint, deleting a user leaves orphan rows
+        in user_oidc_links / user_totp / user_otp_codes / api_keys — which would
+        block SSO re-login and leak MFA secrets (#1285).
+        """
+        from sqlalchemy import select
+
+        from backend.app.models.api_key import APIKey
+        from backend.app.models.long_lived_token import LongLivedToken
+        from backend.app.models.oidc_provider import UserOIDCLink
+        from backend.app.models.user import User
+        from backend.app.models.user_otp_code import UserOTPCode
+        from backend.app.models.user_totp import UserTOTP
+
         # Create user
         # Create user
         create_response = await async_client.post(
         create_response = await async_client.post(
             "/api/v1/users/",
             "/api/v1/users/",
@@ -446,6 +461,15 @@ class TestUsersAPI:
 
 
         assert response.status_code == 204
         assert response.status_code == 204
 
 
+        # All auth-related rows for this user must be gone — see #1285.
+        await db_session.commit()
+        user_row = await db_session.execute(select(User).where(User.id == user_id))
+        assert user_row.scalar_one_or_none() is None, "User row not deleted"
+
+        for model in (UserOIDCLink, UserTOTP, UserOTPCode, APIKey, LongLivedToken):
+            rows = await db_session.execute(select(model).where(model.user_id == user_id))
+            assert rows.scalars().all() == [], f"Orphan {model.__name__} rows left after user delete"
+
 
 
 class TestAuthDisableAPI:
 class TestAuthDisableAPI:
     """Integration tests for /api/v1/auth/disable endpoint."""
     """Integration tests for /api/v1/auth/disable endpoint."""

+ 49 - 0
backend/tests/integration/test_mfa_api.py

@@ -4843,3 +4843,52 @@ class TestOIDCAutoCreateDefaultGroup:
             jwks_data=jwks_data,
             jwks_data=jwks_data,
         )
         )
         assert "Administrators" in group_names, f"Expected Administrators, got {group_names}"
         assert "Administrators" in group_names, f"Expected Administrators, got {group_names}"
+
+
+class TestListOidcLinksDefensiveProviderNull:
+    """list_oidc_links must not crash if a link's provider is orphaned on SQLite.
+
+    PR for #1285 added a defensive null-check at mfa.py:1871 so the endpoint
+    returns ``provider_name="<deleted>"`` for orphan links instead of raising
+    AttributeError when accessing ``link.provider.name``. This scenario is
+    only reachable on SQLite (PRAGMA foreign_keys=OFF) when an OIDCProvider
+    row is removed without the ORM cascade running.
+    """
+
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_list_oidc_links_returns_deleted_marker_for_orphan_provider(
+        self, async_client: AsyncClient, db_session: AsyncSession
+    ):
+        from sqlalchemy import select
+
+        from backend.app.models.oidc_provider import UserOIDCLink
+
+        admin_token = await _setup_and_login(async_client, "linkorphan_adm", "LinkOrphanAdm1!")
+
+        admin_row = await db_session.execute(select(User).where(User.username == "linkorphan_adm"))
+        admin_user = admin_row.scalar_one()
+
+        # Orphan link: provider_id=99999 deliberately points at no row.
+        db_session.add(
+            UserOIDCLink(
+                user_id=admin_user.id,
+                provider_id=99999,
+                provider_user_id="orphan-sub",
+                provider_email="orphan@example.com",
+            )
+        )
+        await db_session.commit()
+
+        resp = await async_client.get(
+            "/api/v1/auth/oidc/links",
+            headers=_auth_header(admin_token),
+        )
+        assert resp.status_code == 200, resp.text
+        links = resp.json()
+        assert len(links) == 1
+        # The fix: orphan provider_id no longer crashes — returns "<deleted>" instead.
+        assert links[0]["provider_name"] == "<deleted>", (
+            f"Expected '<deleted>' fallback for orphan provider, got {links[0]['provider_name']!r}"
+        )
+        assert links[0]["provider_id"] == 99999

+ 299 - 0
backend/tests/integration/test_oidc_relogin.py

@@ -0,0 +1,299 @@
+"""E2E test for issue #1285: SSO user can re-login after admin deletion.
+
+Reproduces the exact symptom from the issue: a user logs in via OIDC
+(auto_create_users=True), gets created, is then deleted by the admin, and
+attempts to log in again. With the fix in delete_user (UserOIDCLink cleanup)
++ the orphan-cleanup migration, the second OIDC callback must trigger
+auto_create_users and produce a fresh user — instead of redirecting to
+"account_inactive" because of the orphan link.
+"""
+
+from __future__ import annotations
+
+import base64
+import secrets
+import time
+from datetime import datetime, timedelta, timezone
+from unittest.mock import patch
+
+import jwt as pyjwt
+import pytest
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from httpx import AsyncClient
+from sqlalchemy import select
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from backend.app.models.auth_ephemeral import AuthEphemeralToken
+from backend.app.models.oidc_provider import UserOIDCLink
+from backend.app.models.user import User
+
+
+def _make_rsa_key():
+    """Throwaway RSA + JWKS for the mocked IdP."""
+    priv = rsa.generate_private_key(public_exponent=65537, key_size=2048)
+    pem = priv.private_bytes(
+        serialization.Encoding.PEM,
+        serialization.PrivateFormat.TraditionalOpenSSL,
+        serialization.NoEncryption(),
+    )
+    pub = priv.public_key().public_numbers()
+
+    def _b64url(n: int, length: int) -> str:
+        return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
+
+    jwks = {
+        "keys": [
+            {
+                "kty": "RSA",
+                "use": "sig",
+                "alg": "RS256",
+                "kid": "test-kid-1",
+                "n": _b64url(pub.n, 256),
+                "e": _b64url(pub.e, 3),
+            }
+        ]
+    }
+    return pem, jwks
+
+
+class _MockResp:
+    def __init__(self, data):
+        self._data = data
+        self.status_code = 200
+        self.is_success = True
+        self.text = str(data)
+
+    def json(self):
+        return self._data
+
+    def raise_for_status(self):
+        pass
+
+
+def _mock_httpx_factory(discovery_doc, jwks_data, token_response):
+    class _MockHttpxClient:
+        def __init__(self, *args, **kwargs):
+            pass
+
+        async def __aenter__(self):
+            return self
+
+        async def __aexit__(self, *args):
+            pass
+
+        async def get(self, url, **kwargs):
+            if "jwks" in url:
+                return _MockResp(jwks_data)
+            return _MockResp(discovery_doc)
+
+        async def post(self, url, **kwargs):
+            return _MockResp(token_response)
+
+    return _MockHttpxClient
+
+
+async def _trigger_oidc_callback(
+    async_client: AsyncClient,
+    db_session: AsyncSession,
+    provider_id: int,
+    issuer: str,
+    client_id: str,
+    private_pem: bytes,
+    jwks_data: dict,
+    *,
+    sub: str,
+    email: str,
+) -> str:
+    """Run a full mocked OIDC callback and return the resulting access token."""
+    nonce = secrets.token_urlsafe(16)
+    state = secrets.token_urlsafe(32)
+    code_verifier = secrets.token_urlsafe(48)
+
+    now = int(time.time())
+    id_token = pyjwt.encode(
+        {
+            "sub": sub,
+            "iss": issuer,
+            "aud": client_id,
+            "nonce": nonce,
+            "email": email,
+            "email_verified": True,
+            "iat": now,
+            "exp": now + 300,
+        },
+        private_pem,
+        algorithm="RS256",
+        headers={"kid": "test-kid-1"},
+    )
+
+    db_session.add(
+        AuthEphemeralToken(
+            token=state,
+            token_type="oidc_state",
+            provider_id=provider_id,
+            nonce=nonce,
+            code_verifier=code_verifier,
+            expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
+        )
+    )
+    await db_session.commit()
+
+    discovery = {
+        "issuer": issuer,
+        "authorization_endpoint": f"{issuer}/auth",
+        "token_endpoint": f"{issuer}/token",
+        "jwks_uri": f"{issuer}/.well-known/jwks.json",
+    }
+    token_response = {
+        "access_token": "mock-access",
+        "token_type": "Bearer",
+        "id_token": id_token,
+    }
+
+    with patch(
+        "backend.app.api.routes.mfa.httpx.AsyncClient",
+        _mock_httpx_factory(discovery, jwks_data, token_response),
+    ):
+        callback_resp = await async_client.get(
+            f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
+            follow_redirects=False,
+        )
+
+    assert callback_resp.status_code == 302, callback_resp.text
+    location = callback_resp.headers.get("location", "")
+    assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
+
+    exchange_token = location.split("oidc_token=")[1].split("&")[0]
+    exchange_resp = await async_client.post(
+        "/api/v1/auth/oidc/exchange",
+        json={"oidc_token": exchange_token},
+    )
+    assert exchange_resp.status_code == 200, exchange_resp.text
+    return exchange_resp.json()["access_token"]
+
+
+class TestOidcReloginAfterDelete:
+    """Issue #1285: SSO user must be recreatable after admin deletion."""
+
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_relogin_after_delete_recreates_user_via_auto_create(
+        self, async_client: AsyncClient, db_session: AsyncSession
+    ):
+        """User created via OIDC → deleted by admin → second OIDC login creates a new user.
+
+        Without the delete_user UserOIDCLink-cleanup fix, the second callback finds
+        the orphan link, fails to load the now-deleted user, and redirects to
+        ``account_inactive`` — never reaching auto_create_users.
+        """
+        private_pem, jwks = _make_rsa_key()
+        issuer = "https://idp.relogin-test.example.com"
+        client_id = "relogin-test-client"
+        sub = "oidc-sub-relogin-1285"
+        email = "relogin@example.com"
+
+        # Admin setup + create OIDC provider
+        await async_client.post(
+            "/api/v1/auth/setup",
+            json={
+                "auth_enabled": True,
+                "admin_username": "reloginadm",
+                "admin_password": "AdminPass1!",
+            },
+        )
+        login_resp = await async_client.post(
+            "/api/v1/auth/login",
+            json={"username": "reloginadm", "password": "AdminPass1!"},
+        )
+        admin_token = login_resp.json()["access_token"]
+        headers = {"Authorization": f"Bearer {admin_token}"}
+
+        create_resp = await async_client.post(
+            "/api/v1/auth/oidc/providers",
+            json={
+                "name": "ReloginIdP",
+                "issuer_url": issuer,
+                "client_id": client_id,
+                "client_secret": "test-secret",
+                "scopes": "openid email profile",
+                "is_enabled": True,
+                "auto_create_users": True,
+            },
+            headers=headers,
+        )
+        assert create_resp.status_code == 201, create_resp.text
+        provider_id = create_resp.json()["id"]
+
+        # ── First OIDC login: creates user + link ──
+        await _trigger_oidc_callback(
+            async_client,
+            db_session,
+            provider_id,
+            issuer,
+            client_id,
+            private_pem,
+            jwks,
+            sub=sub,
+            email=email,
+        )
+
+        await db_session.commit()
+        first_user_row = await db_session.execute(select(User).where(User.email == email))
+        first_user = first_user_row.scalar_one()
+        first_user_id = first_user.id
+        first_user_created_at = first_user.created_at
+
+        first_link_row = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.provider_user_id == sub))
+        assert first_link_row.scalar_one().user_id == first_user_id
+
+        # ── Admin deletes the user ──
+        del_resp = await async_client.delete(
+            f"/api/v1/users/{first_user_id}",
+            headers=headers,
+        )
+        assert del_resp.status_code == 204, del_resp.text
+
+        await db_session.commit()
+        # With the fix the orphan link is gone too — verifying because that
+        # is exactly the precondition for auto_create to fire on retry.
+        link_after_delete = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.provider_user_id == sub))
+        assert link_after_delete.scalar_one_or_none() is None, (
+            "Orphan UserOIDCLink left after delete — would block re-login per #1285"
+        )
+        # And the user row itself is gone (#1285 prerequisite).
+        user_after_delete = await db_session.execute(select(User).where(User.email == email))
+        assert user_after_delete.scalar_one_or_none() is None
+
+        # ── Second OIDC login with the same sub: auto_create must run again ──
+        # The helper already asserts a 302 with oidc_token=… — that alone proves
+        # auto_create fired (otherwise the callback would have redirected to
+        # /?oidc_error=account_inactive and the helper would have failed).
+        await _trigger_oidc_callback(
+            async_client,
+            db_session,
+            provider_id,
+            issuer,
+            client_id,
+            private_pem,
+            jwks,
+            sub=sub,
+            email=email,
+        )
+
+        await db_session.commit()
+        second_row = await db_session.execute(select(User).where(User.email == email))
+        second_user = second_row.scalar_one()
+        # SQLite recycles primary-key ids when AUTOINCREMENT is not declared, so
+        # comparing ids is not a reliable freshness signal across delete+recreate.
+        # The decisive proof: a new user row was created (post-delete) and a
+        # fresh link points at it. created_at must not be earlier than the
+        # original — equality is acceptable on fast machines where seconds match.
+        assert second_user.created_at >= first_user_created_at, (
+            f"Re-created user has earlier created_at ({second_user.created_at}) "
+            f"than the deleted original ({first_user_created_at}) — bug regression"
+        )
+
+        # And a fresh link for the new user
+        link_after = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.provider_user_id == sub))
+        assert link_after.scalar_one().user_id == second_user.id

+ 289 - 0
backend/tests/integration/test_users_auth_cleanup.py

@@ -0,0 +1,289 @@
+"""Integration tests for OIDC/MFA cleanup on user deletion.
+
+These tests verify the fix for issue #1285: deleting a user via DELETE
+/api/v1/users/{id} must also remove their UserOIDCLink, UserTOTP, and
+UserOTPCode rows. On PostgreSQL the FK CASCADE handles this, but SQLite
+ships with FK enforcement off — without explicit DELETEs in the endpoint,
+orphan rows would block SSO re-login and leak MFA secrets.
+"""
+
+from datetime import datetime, timedelta, timezone
+
+import pytest
+from httpx import AsyncClient
+from sqlalchemy import select
+from sqlalchemy.ext.asyncio import AsyncSession
+
+
+class TestDeleteUserCleansAuthRows:
+    """Verify delete_user removes OIDC link + TOTP + OTP rows owned by the user."""
+
+    @pytest.fixture
+    async def auth_token(self, async_client: AsyncClient):
+        """Setup auth and return admin token."""
+        await async_client.post(
+            "/api/v1/auth/setup",
+            json={
+                "auth_enabled": True,
+                "admin_username": "cleanupadmin",
+                "admin_password": "AdminPass1!",
+            },
+        )
+        login_response = await async_client.post(
+            "/api/v1/auth/login",
+            json={"username": "cleanupadmin", "password": "AdminPass1!"},
+        )
+        return login_response.json()["access_token"]
+
+    async def _create_user(self, async_client: AsyncClient, auth_token: str, username: str) -> int:
+        """Helper: create a non-admin user via the API and return their id."""
+        create_resp = await async_client.post(
+            "/api/v1/users/",
+            headers={"Authorization": f"Bearer {auth_token}"},
+            json={
+                "username": username,
+                "password": "Password123!",
+                "role": "user",
+            },
+        )
+        assert create_resp.status_code in (200, 201), create_resp.text
+        return create_resp.json()["id"]
+
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_delete_user_removes_oidc_links(
+        self,
+        async_client: AsyncClient,
+        db_session: AsyncSession,
+        auth_token: str,
+    ):
+        """Deleting a user must also delete their UserOIDCLink rows."""
+        from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
+
+        user_id = await self._create_user(async_client, auth_token, "oidcclean")
+
+        # Use the client_secret property setter (mfa_encrypt) instead of poking
+        # _client_secret_enc directly — keeps the fixture in sync with the real
+        # encryption flow even though nothing decrypts it in this test
+        # (#1295 review nit).
+        provider = OIDCProvider(
+            name="CleanupProv",
+            issuer_url="https://cleanup.example.com",
+            client_id="cleanup_client",
+            scopes="openid email profile",
+            is_enabled=True,
+        )
+        provider.client_secret = "cleanup_secret"
+        db_session.add(provider)
+        await db_session.flush()
+        db_session.add(
+            UserOIDCLink(
+                user_id=user_id,
+                provider_id=provider.id,
+                provider_user_id="sub-cleanup-123",
+                provider_email="cleanup@example.com",
+            )
+        )
+        await db_session.commit()
+
+        # Sanity check: link exists before delete
+        pre = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.user_id == user_id))
+        assert pre.scalar_one_or_none() is not None
+
+        # Delete via API
+        resp = await async_client.delete(
+            f"/api/v1/users/{user_id}",
+            headers={"Authorization": f"Bearer {auth_token}"},
+        )
+        assert resp.status_code == 204
+
+        # Link must be gone (the bug from #1285 is when it persists on SQLite)
+        await db_session.commit()
+        post = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.user_id == user_id))
+        assert post.scalar_one_or_none() is None, "UserOIDCLink orphan left behind — #1285 regression"
+
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_delete_user_removes_user_totp(
+        self,
+        async_client: AsyncClient,
+        db_session: AsyncSession,
+        auth_token: str,
+    ):
+        """Deleting a user must also delete their UserTOTP row (MFA secret)."""
+        from backend.app.models.user_totp import UserTOTP
+
+        user_id = await self._create_user(async_client, auth_token, "totpclean")
+
+        totp = UserTOTP(user_id=user_id, is_enabled=True)
+        totp.secret = "JBSWY3DPEHPK3PXP"  # encrypts via property setter
+        db_session.add(totp)
+        await db_session.commit()
+
+        pre = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user_id))
+        assert pre.scalar_one_or_none() is not None
+
+        resp = await async_client.delete(
+            f"/api/v1/users/{user_id}",
+            headers={"Authorization": f"Bearer {auth_token}"},
+        )
+        assert resp.status_code == 204
+
+        await db_session.commit()
+        post = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user_id))
+        assert post.scalar_one_or_none() is None, "UserTOTP orphan — MFA secret leaked after user delete"
+
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_delete_user_removes_long_lived_tokens(
+        self,
+        async_client: AsyncClient,
+        db_session: AsyncSession,
+        auth_token: str,
+    ):
+        """Deleting a user must also delete their LongLivedToken rows.
+
+        Camera-stream tokens whose `secret_hash` is still valid would
+        otherwise be matchable by `verify()` via `lookup_prefix` even
+        after the user is gone (#1295 review feedback).
+        """
+        from backend.app.models.long_lived_token import LongLivedToken
+
+        user_id = await self._create_user(async_client, auth_token, "lltclean")
+
+        db_session.add(
+            LongLivedToken(
+                user_id=user_id,
+                name="HA card",
+                lookup_prefix="abcd1234",
+                secret_hash="$2b$12$dummybcrypthashabcdefghij1234567890",
+                scope="camera_stream",
+                expires_at=datetime.now(timezone.utc) + timedelta(days=30),
+            )
+        )
+        await db_session.commit()
+
+        pre = await db_session.execute(select(LongLivedToken).where(LongLivedToken.user_id == user_id))
+        assert pre.scalar_one_or_none() is not None
+
+        resp = await async_client.delete(
+            f"/api/v1/users/{user_id}",
+            headers={"Authorization": f"Bearer {auth_token}"},
+        )
+        assert resp.status_code == 204
+
+        await db_session.commit()
+        post = await db_session.execute(select(LongLivedToken).where(LongLivedToken.user_id == user_id))
+        assert post.scalar_one_or_none() is None, (
+            "LongLivedToken orphan — camera-stream secret still in DB after user delete"
+        )
+
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_delete_user_removes_user_otp_codes(
+        self,
+        async_client: AsyncClient,
+        db_session: AsyncSession,
+        auth_token: str,
+    ):
+        """Deleting a user must also delete their UserOTPCode rows."""
+        from backend.app.models.user_otp_code import UserOTPCode
+
+        user_id = await self._create_user(async_client, auth_token, "otpclean")
+
+        # Two pending OTP codes so we verify the WHERE clause hits all rows
+        for _ in range(2):
+            db_session.add(
+                UserOTPCode(
+                    user_id=user_id,
+                    code_hash="$pbkdf2-sha256$dummy",
+                    expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
+                )
+            )
+        await db_session.commit()
+
+        pre = await db_session.execute(select(UserOTPCode).where(UserOTPCode.user_id == user_id))
+        assert len(pre.scalars().all()) == 2
+
+        resp = await async_client.delete(
+            f"/api/v1/users/{user_id}",
+            headers={"Authorization": f"Bearer {auth_token}"},
+        )
+        assert resp.status_code == 204
+
+        await db_session.commit()
+        post = await db_session.execute(select(UserOTPCode).where(UserOTPCode.user_id == user_id))
+        assert post.scalars().all() == [], "UserOTPCode orphans left behind"
+
+    @pytest.mark.asyncio
+    @pytest.mark.integration
+    async def test_delete_user_with_all_auth_rows(
+        self,
+        async_client: AsyncClient,
+        db_session: AsyncSession,
+        auth_token: str,
+    ):
+        """Combined: one user with OIDC link + TOTP + OTP + long-lived token — all cleaned up atomically."""
+        from backend.app.models.long_lived_token import LongLivedToken
+        from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
+        from backend.app.models.user_otp_code import UserOTPCode
+        from backend.app.models.user_totp import UserTOTP
+
+        user_id = await self._create_user(async_client, auth_token, "fullauth")
+
+        provider = OIDCProvider(
+            name="FullAuthProv",
+            issuer_url="https://fullauth.example.com",
+            client_id="fullauth_client",
+            scopes="openid email profile",
+            is_enabled=True,
+        )
+        provider.client_secret = "fullauth_secret"
+        db_session.add(provider)
+        await db_session.flush()
+
+        db_session.add(
+            UserOIDCLink(
+                user_id=user_id,
+                provider_id=provider.id,
+                provider_user_id="sub-fullauth",
+                provider_email="full@example.com",
+            )
+        )
+        totp = UserTOTP(user_id=user_id, is_enabled=True)
+        totp.secret = "JBSWY3DPEHPK3PXP"
+        db_session.add(totp)
+        db_session.add(
+            UserOTPCode(
+                user_id=user_id,
+                code_hash="$pbkdf2-sha256$dummy",
+                expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
+            )
+        )
+        db_session.add(
+            LongLivedToken(
+                user_id=user_id,
+                name="combined-test",
+                lookup_prefix="zz999999",
+                secret_hash="$2b$12$dummybcrypthashabcdefghij1234567890",
+                scope="camera_stream",
+                expires_at=datetime.now(timezone.utc) + timedelta(days=30),
+            )
+        )
+        await db_session.commit()
+
+        resp = await async_client.delete(
+            f"/api/v1/users/{user_id}",
+            headers={"Authorization": f"Bearer {auth_token}"},
+        )
+        assert resp.status_code == 204
+
+        await db_session.commit()
+        link_post = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.user_id == user_id))
+        totp_post = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user_id))
+        otp_post = await db_session.execute(select(UserOTPCode).where(UserOTPCode.user_id == user_id))
+        llt_post = await db_session.execute(select(LongLivedToken).where(LongLivedToken.user_id == user_id))
+        assert link_post.scalar_one_or_none() is None
+        assert totp_post.scalar_one_or_none() is None
+        assert otp_post.scalars().all() == []
+        assert llt_post.scalar_one_or_none() is None

+ 385 - 0
backend/tests/unit/test_orphan_auth_cleanup_migration.py

@@ -0,0 +1,385 @@
+"""Regression test for the orphan OIDC/MFA cleanup migration (#1285).
+
+On SQLite (PRAGMA foreign_keys=OFF by default), the ON DELETE CASCADE
+declared on user_oidc_links.user_id / user_totp.user_id /
+user_otp_codes.user_id is NOT enforced. Users deleted via the API before
+the fix (PR for #1285) left orphan rows pointing to non-existent users.
+The OIDC callback would then find the orphan UserOIDCLink, fail to load
+the deleted user, and redirect to ``account_inactive`` instead of running
+auto_create_users.
+
+run_migrations now sweeps orphans on every startup; this test verifies it
+on all three tables and proves idempotency + no-op behaviour on fresh DBs.
+"""
+
+from __future__ import annotations
+
+from datetime import datetime, timedelta, timezone
+
+import pytest
+from sqlalchemy import text
+from sqlalchemy.ext.asyncio import create_async_engine
+
+from backend.app.core.database import run_migrations
+
+
+@pytest.fixture(autouse=True)
+def force_sqlite_dialect(monkeypatch):
+    """Pin the SQLite branch in run_migrations regardless of env."""
+    from backend.app.core import database as database_module, db_dialect
+
+    monkeypatch.setattr(db_dialect, "is_sqlite", lambda: True)
+    monkeypatch.setattr(db_dialect, "is_postgres", lambda: False)
+    monkeypatch.setattr(database_module, "is_sqlite", lambda: True)
+
+
+def _register_all_models():
+    """Import the models package so every Base.metadata table is registered.
+
+    Previously this listed each submodule by hand and silently drifted from
+    backend/app/models/__init__.py (#1295 review nit). Importing the package
+    triggers __init__.py which covers most of the schema automatically.
+
+    A handful of submodules are NOT re-exported from __init__.py yet but are
+    required by run_migrations (they touch tables that don't appear in any
+    re-exported model). Those are imported by submodule below so the test
+    engine has the full schema available. Keep this list in sync with the
+    set conftest.py imports for test_engine.
+    """
+    import backend.app.models  # noqa: F401
+
+    # Submodules whose tables are touched by run_migrations but which are
+    # not re-exported from __init__.py.
+    from backend.app.models import (  # noqa: F401
+        external_link,
+        print_queue,
+        project_bom,
+        slot_preset,
+        spoolman_k_profile,
+        spoolman_slot_assignment,
+        virtual_printer,
+    )
+
+
+@pytest.fixture
+async def engine_with_full_schema():
+    """In-memory SQLite with the full schema via create_all (no manual SQL)."""
+    from backend.app.core.database import Base
+
+    _register_all_models()
+
+    engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
+    async with engine.begin() as conn:
+        await conn.run_sync(Base.metadata.create_all)
+    yield engine
+    await engine.dispose()
+
+
+# -----------------------------------------------------------------------------
+# Per-table orphan cleanup
+# -----------------------------------------------------------------------------
+
+
+async def test_migration_deletes_orphan_user_oidc_links(engine_with_full_schema):
+    """Orphan rows in user_oidc_links must be removed; rows pointing at a real
+    user must stay."""
+    async with engine_with_full_schema.begin() as conn:
+        # One real user, one nonexistent referenced by an OIDC link
+        await conn.execute(
+            text(
+                "INSERT INTO users (id, username, password_hash, is_active, created_at, updated_at, "
+                "role, auth_source) VALUES (1, 'survivor', 'h', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, "
+                "'user', 'local')"
+            )
+        )
+        # Provider (any provider — the link only requires existence)
+        await conn.execute(
+            text(
+                "INSERT INTO oidc_providers (id, name, issuer_url, client_id, client_secret, "
+                "scopes, is_enabled, auto_create_users, auto_link_existing_accounts, email_claim, "
+                "require_email_verified, created_at, updated_at) VALUES (1, 'p', 'https://x', 'c', "
+                "'s', 'openid', 1, 1, 0, 'email', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
+            )
+        )
+        # Valid link
+        await conn.execute(
+            text(
+                "INSERT INTO user_oidc_links (id, user_id, provider_id, provider_user_id, created_at) "
+                "VALUES (10, 1, 1, 'sub-real', CURRENT_TIMESTAMP)"
+            )
+        )
+        # Orphan link — user_id=999 does not exist
+        await conn.execute(
+            text(
+                "INSERT INTO user_oidc_links (id, user_id, provider_id, provider_user_id, created_at) "
+                "VALUES (11, 999, 1, 'sub-orphan', CURRENT_TIMESTAMP)"
+            )
+        )
+
+    async with engine_with_full_schema.begin() as conn:
+        await run_migrations(conn)
+
+    async with engine_with_full_schema.begin() as conn:
+        ids = [row[0] for row in (await conn.execute(text("SELECT id FROM user_oidc_links ORDER BY id"))).all()]
+        assert ids == [10], f"Expected only the valid link to survive, got {ids}"
+
+
+async def test_migration_deletes_orphan_user_totp(engine_with_full_schema):
+    """Orphan rows in user_totp must be removed; rows for real users must stay."""
+    async with engine_with_full_schema.begin() as conn:
+        await conn.execute(
+            text(
+                "INSERT INTO users (id, username, password_hash, is_active, created_at, updated_at, "
+                "role, auth_source) VALUES (1, 'survivor', 'h', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, "
+                "'user', 'local')"
+            )
+        )
+        # Valid TOTP
+        await conn.execute(
+            text(
+                "INSERT INTO user_totp (id, user_id, secret, is_enabled, created_at, updated_at) "
+                "VALUES (10, 1, 'enc', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
+            )
+        )
+        # Orphan TOTP — user_id=999 does not exist (would never happen with FK on,
+        # but SQLite tolerates it because PRAGMA foreign_keys=OFF)
+        await conn.execute(
+            text(
+                "INSERT INTO user_totp (id, user_id, secret, is_enabled, created_at, updated_at) "
+                "VALUES (11, 999, 'orphan_enc', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
+            )
+        )
+
+    async with engine_with_full_schema.begin() as conn:
+        await run_migrations(conn)
+
+    async with engine_with_full_schema.begin() as conn:
+        ids = [row[0] for row in (await conn.execute(text("SELECT id FROM user_totp ORDER BY id"))).all()]
+        assert ids == [10], f"Expected only the valid TOTP row to survive, got {ids}"
+
+
+async def test_migration_deletes_orphan_user_otp_codes(engine_with_full_schema):
+    """Orphan rows in user_otp_codes must be removed; rows for real users must stay."""
+    exp = (datetime.now(timezone.utc) + timedelta(minutes=10)).isoformat()
+    async with engine_with_full_schema.begin() as conn:
+        await conn.execute(
+            text(
+                "INSERT INTO users (id, username, password_hash, is_active, created_at, updated_at, "
+                "role, auth_source) VALUES (1, 'survivor', 'h', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, "
+                "'user', 'local')"
+            )
+        )
+        # Valid OTP code
+        await conn.execute(
+            text(
+                "INSERT INTO user_otp_codes (id, user_id, code_hash, attempts, used, expires_at, created_at) "
+                "VALUES (10, 1, '$h$', 0, 0, :exp, CURRENT_TIMESTAMP)"
+            ),
+            {"exp": exp},
+        )
+        # Two orphan OTP codes
+        await conn.execute(
+            text(
+                "INSERT INTO user_otp_codes (id, user_id, code_hash, attempts, used, expires_at, created_at) "
+                "VALUES (11, 999, '$h$', 0, 0, :exp, CURRENT_TIMESTAMP)"
+            ),
+            {"exp": exp},
+        )
+        await conn.execute(
+            text(
+                "INSERT INTO user_otp_codes (id, user_id, code_hash, attempts, used, expires_at, created_at) "
+                "VALUES (12, 1000, '$h$', 0, 0, :exp, CURRENT_TIMESTAMP)"
+            ),
+            {"exp": exp},
+        )
+
+    async with engine_with_full_schema.begin() as conn:
+        await run_migrations(conn)
+
+    async with engine_with_full_schema.begin() as conn:
+        ids = [row[0] for row in (await conn.execute(text("SELECT id FROM user_otp_codes ORDER BY id"))).all()]
+        assert ids == [10], f"Expected only the valid OTP row to survive, got {ids}"
+
+
+async def test_migration_deletes_orphan_long_lived_tokens(engine_with_full_schema):
+    """Orphan rows in long_lived_tokens must be removed; rows for real users must stay.
+
+    Camera-stream tokens whose secret_hash is still valid would otherwise be
+    matchable by verify() via lookup_prefix even after the owning user is gone
+    (#1295 review feedback extended #1285).
+    """
+    exp = (datetime.now(timezone.utc) + timedelta(days=30)).isoformat()
+    async with engine_with_full_schema.begin() as conn:
+        await conn.execute(
+            text(
+                "INSERT INTO users (id, username, password_hash, is_active, created_at, updated_at, "
+                "role, auth_source) VALUES (1, 'survivor', 'h', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, "
+                "'user', 'local')"
+            )
+        )
+        # Valid token for the real user
+        await conn.execute(
+            text(
+                "INSERT INTO long_lived_tokens (id, user_id, name, lookup_prefix, secret_hash, "
+                "scope, expires_at, created_at) VALUES (10, 1, 'real', 'aaaa1111', '$2b$h', "
+                "'camera_stream', :exp, CURRENT_TIMESTAMP)"
+            ),
+            {"exp": exp},
+        )
+        # Orphan token — user_id=999 does not exist
+        await conn.execute(
+            text(
+                "INSERT INTO long_lived_tokens (id, user_id, name, lookup_prefix, secret_hash, "
+                "scope, expires_at, created_at) VALUES (11, 999, 'orphan', 'bbbb2222', '$2b$h', "
+                "'camera_stream', :exp, CURRENT_TIMESTAMP)"
+            ),
+            {"exp": exp},
+        )
+
+    async with engine_with_full_schema.begin() as conn:
+        await run_migrations(conn)
+
+    async with engine_with_full_schema.begin() as conn:
+        ids = [row[0] for row in (await conn.execute(text("SELECT id FROM long_lived_tokens ORDER BY id"))).all()]
+        assert ids == [10], f"Expected only the valid long-lived token to survive, got {ids}"
+
+
+# -----------------------------------------------------------------------------
+# No-op and idempotency
+# -----------------------------------------------------------------------------
+
+
+async def test_migration_is_noop_on_fresh_install(engine_with_full_schema):
+    """A fresh DB with empty users + auth tables must not raise and must not
+    modify anything."""
+    async with engine_with_full_schema.begin() as conn:
+        await run_migrations(conn)
+        await run_migrations(conn)  # second run, still fine
+
+    # Static queries (one per table) instead of an f-string interpolated loop:
+    # Bandit B608 flags f"... FROM {tbl}" as a possible SQL-injection vector
+    # even when ``tbl`` is bound to a tuple of literals. Spelling out each
+    # table name makes the intent clear and silences the false-positive
+    # without resorting to a noqa marker. See PR #1295 CodeQL alert #798.
+    async with engine_with_full_schema.begin() as conn:
+        oidc_count = (await conn.execute(text("SELECT COUNT(*) FROM user_oidc_links"))).scalar_one()
+        totp_count = (await conn.execute(text("SELECT COUNT(*) FROM user_totp"))).scalar_one()
+        otp_count = (await conn.execute(text("SELECT COUNT(*) FROM user_otp_codes"))).scalar_one()
+        llt_count = (await conn.execute(text("SELECT COUNT(*) FROM long_lived_tokens"))).scalar_one()
+        assert oidc_count == 0
+        assert totp_count == 0
+        assert otp_count == 0
+        assert llt_count == 0
+
+
+async def test_migration_is_idempotent(engine_with_full_schema):
+    """Running the migration twice on data with orphans cleans them once, the
+    second run finds nothing left and is a no-op."""
+    async with engine_with_full_schema.begin() as conn:
+        await conn.execute(
+            text(
+                "INSERT INTO users (id, username, password_hash, is_active, created_at, updated_at, "
+                "role, auth_source) VALUES (1, 'u', 'h', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, "
+                "'user', 'local')"
+            )
+        )
+        await conn.execute(
+            text(
+                "INSERT INTO oidc_providers (id, name, issuer_url, client_id, client_secret, "
+                "scopes, is_enabled, auto_create_users, auto_link_existing_accounts, email_claim, "
+                "require_email_verified, created_at, updated_at) VALUES (1, 'p', 'https://x', 'c', "
+                "'s', 'openid', 1, 1, 0, 'email', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
+            )
+        )
+        await conn.execute(
+            text(
+                "INSERT INTO user_oidc_links (id, user_id, provider_id, provider_user_id, created_at) "
+                "VALUES (1, 999, 1, 'orphan', CURRENT_TIMESTAMP)"
+            )
+        )
+
+    async with engine_with_full_schema.begin() as conn:
+        await run_migrations(conn)
+    # Second run must not crash, must not double-touch anything
+    async with engine_with_full_schema.begin() as conn:
+        await run_migrations(conn)
+
+    async with engine_with_full_schema.begin() as conn:
+        count = (await conn.execute(text("SELECT COUNT(*) FROM user_oidc_links"))).scalar_one()
+        assert count == 0
+
+
+async def test_migration_keeps_rows_for_existing_users(engine_with_full_schema):
+    """Belt-and-braces: rows for real users must never be touched even when
+    other tables have orphans being cleaned at the same time."""
+    exp = (datetime.now(timezone.utc) + timedelta(minutes=10)).isoformat()
+    async with engine_with_full_schema.begin() as conn:
+        for uid in (1, 2):
+            await conn.execute(
+                text(
+                    "INSERT INTO users (id, username, password_hash, is_active, created_at, updated_at, "
+                    "role, auth_source) VALUES (:id, :name, 'h', 1, CURRENT_TIMESTAMP, "
+                    "CURRENT_TIMESTAMP, 'user', 'local')"
+                ),
+                {"id": uid, "name": f"u{uid}"},
+            )
+        await conn.execute(
+            text(
+                "INSERT INTO oidc_providers (id, name, issuer_url, client_id, client_secret, "
+                "scopes, is_enabled, auto_create_users, auto_link_existing_accounts, email_claim, "
+                "require_email_verified, created_at, updated_at) VALUES (1, 'p', 'https://x', 'c', "
+                "'s', 'openid', 1, 1, 0, 'email', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
+            )
+        )
+        # Mix: valid + orphan in each table
+        await conn.execute(
+            text(
+                "INSERT INTO user_oidc_links (id, user_id, provider_id, provider_user_id, created_at) "
+                "VALUES (1, 1, 1, 'real', CURRENT_TIMESTAMP), "
+                "(2, 999, 1, 'orphan', CURRENT_TIMESTAMP)"
+            )
+        )
+        await conn.execute(
+            text(
+                "INSERT INTO user_totp (id, user_id, secret, is_enabled, created_at, updated_at) "
+                "VALUES (1, 2, 'enc', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP), "
+                "(2, 998, 'orphan', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
+            )
+        )
+        await conn.execute(
+            text(
+                "INSERT INTO user_otp_codes (id, user_id, code_hash, attempts, used, expires_at, "
+                "created_at) VALUES (1, 1, '$h$', 0, 0, :exp, CURRENT_TIMESTAMP), "
+                "(2, 997, '$h$', 0, 0, :exp, CURRENT_TIMESTAMP)"
+            ),
+            {"exp": exp},
+        )
+        llt_exp = (datetime.now(timezone.utc) + timedelta(days=30)).isoformat()
+        await conn.execute(
+            text(
+                "INSERT INTO long_lived_tokens (id, user_id, name, lookup_prefix, secret_hash, "
+                "scope, expires_at, created_at) VALUES (1, 2, 'real', 'aaaa1111', '$h', "
+                "'camera_stream', :exp, CURRENT_TIMESTAMP), (2, 996, 'orphan', 'bbbb2222', '$h', "
+                "'camera_stream', :exp, CURRENT_TIMESTAMP)"
+            ),
+            {"exp": llt_exp},
+        )
+
+    async with engine_with_full_schema.begin() as conn:
+        await run_migrations(conn)
+
+    async with engine_with_full_schema.begin() as conn:
+        links = [
+            row[0] for row in (await conn.execute(text("SELECT user_id FROM user_oidc_links ORDER BY user_id"))).all()
+        ]
+        totps = [row[0] for row in (await conn.execute(text("SELECT user_id FROM user_totp ORDER BY user_id"))).all()]
+        otps = [
+            row[0] for row in (await conn.execute(text("SELECT user_id FROM user_otp_codes ORDER BY user_id"))).all()
+        ]
+        llts = [
+            row[0] for row in (await conn.execute(text("SELECT user_id FROM long_lived_tokens ORDER BY user_id"))).all()
+        ]
+        assert links == [1], f"Expected only user_id=1 to survive in user_oidc_links, got {links}"
+        assert totps == [2], f"Expected only user_id=2 to survive in user_totp, got {totps}"
+        assert otps == [1], f"Expected only user_id=1 to survive in user_otp_codes, got {otps}"
+        assert llts == [2], f"Expected only user_id=2 to survive in long_lived_tokens, got {llts}"