| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- """Integration tests for OIDC/MFA cleanup on user deletion.
- These tests verify the fix for issue #1285: deleting a user via DELETE
- /api/v1/users/{id} must also remove their UserOIDCLink, UserTOTP, and
- UserOTPCode rows. On PostgreSQL the FK CASCADE handles this, but SQLite
- ships with FK enforcement off — without explicit DELETEs in the endpoint,
- orphan rows would block SSO re-login and leak MFA secrets.
- """
- from datetime import datetime, timedelta, timezone
- import pytest
- from httpx import AsyncClient
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- class TestDeleteUserCleansAuthRows:
- """Verify delete_user removes OIDC link + TOTP + OTP rows owned by the user."""
- @pytest.fixture
- async def auth_token(self, async_client: AsyncClient):
- """Setup auth and return admin token."""
- await async_client.post(
- "/api/v1/auth/setup",
- json={
- "auth_enabled": True,
- "admin_username": "cleanupadmin",
- "admin_password": "AdminPass1!",
- },
- )
- login_response = await async_client.post(
- "/api/v1/auth/login",
- json={"username": "cleanupadmin", "password": "AdminPass1!"},
- )
- return login_response.json()["access_token"]
- async def _create_user(self, async_client: AsyncClient, auth_token: str, username: str) -> int:
- """Helper: create a non-admin user via the API and return their id."""
- create_resp = await async_client.post(
- "/api/v1/users/",
- headers={"Authorization": f"Bearer {auth_token}"},
- json={
- "username": username,
- "password": "Password123!",
- "role": "user",
- },
- )
- assert create_resp.status_code in (200, 201), create_resp.text
- return create_resp.json()["id"]
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_user_removes_oidc_links(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- auth_token: str,
- ):
- """Deleting a user must also delete their UserOIDCLink rows."""
- from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
- user_id = await self._create_user(async_client, auth_token, "oidcclean")
- # Use the client_secret property setter (mfa_encrypt) instead of poking
- # _client_secret_enc directly — keeps the fixture in sync with the real
- # encryption flow even though nothing decrypts it in this test
- # (#1295 review nit).
- provider = OIDCProvider(
- name="CleanupProv",
- issuer_url="https://cleanup.example.com",
- client_id="cleanup_client",
- scopes="openid email profile",
- is_enabled=True,
- )
- provider.client_secret = "cleanup_secret"
- db_session.add(provider)
- await db_session.flush()
- db_session.add(
- UserOIDCLink(
- user_id=user_id,
- provider_id=provider.id,
- provider_user_id="sub-cleanup-123",
- provider_email="cleanup@example.com",
- )
- )
- await db_session.commit()
- # Sanity check: link exists before delete
- pre = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.user_id == user_id))
- assert pre.scalar_one_or_none() is not None
- # Delete via API
- resp = await async_client.delete(
- f"/api/v1/users/{user_id}",
- headers={"Authorization": f"Bearer {auth_token}"},
- )
- assert resp.status_code == 204
- # Link must be gone (the bug from #1285 is when it persists on SQLite)
- await db_session.commit()
- post = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.user_id == user_id))
- assert post.scalar_one_or_none() is None, "UserOIDCLink orphan left behind — #1285 regression"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_user_removes_user_totp(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- auth_token: str,
- ):
- """Deleting a user must also delete their UserTOTP row (MFA secret)."""
- from backend.app.models.user_totp import UserTOTP
- user_id = await self._create_user(async_client, auth_token, "totpclean")
- totp = UserTOTP(user_id=user_id, is_enabled=True)
- totp.secret = "JBSWY3DPEHPK3PXP" # encrypts via property setter
- db_session.add(totp)
- await db_session.commit()
- pre = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user_id))
- assert pre.scalar_one_or_none() is not None
- resp = await async_client.delete(
- f"/api/v1/users/{user_id}",
- headers={"Authorization": f"Bearer {auth_token}"},
- )
- assert resp.status_code == 204
- await db_session.commit()
- post = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user_id))
- assert post.scalar_one_or_none() is None, "UserTOTP orphan — MFA secret leaked after user delete"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_user_removes_long_lived_tokens(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- auth_token: str,
- ):
- """Deleting a user must also delete their LongLivedToken rows.
- Camera-stream tokens whose `secret_hash` is still valid would
- otherwise be matchable by `verify()` via `lookup_prefix` even
- after the user is gone (#1295 review feedback).
- """
- from backend.app.models.long_lived_token import LongLivedToken
- user_id = await self._create_user(async_client, auth_token, "lltclean")
- db_session.add(
- LongLivedToken(
- user_id=user_id,
- name="HA card",
- lookup_prefix="abcd1234",
- secret_hash="$2b$12$dummybcrypthashabcdefghij1234567890",
- scope="camera_stream",
- expires_at=datetime.now(timezone.utc) + timedelta(days=30),
- )
- )
- await db_session.commit()
- pre = await db_session.execute(select(LongLivedToken).where(LongLivedToken.user_id == user_id))
- assert pre.scalar_one_or_none() is not None
- resp = await async_client.delete(
- f"/api/v1/users/{user_id}",
- headers={"Authorization": f"Bearer {auth_token}"},
- )
- assert resp.status_code == 204
- await db_session.commit()
- post = await db_session.execute(select(LongLivedToken).where(LongLivedToken.user_id == user_id))
- assert post.scalar_one_or_none() is None, (
- "LongLivedToken orphan — camera-stream secret still in DB after user delete"
- )
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_user_removes_user_otp_codes(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- auth_token: str,
- ):
- """Deleting a user must also delete their UserOTPCode rows."""
- from backend.app.models.user_otp_code import UserOTPCode
- user_id = await self._create_user(async_client, auth_token, "otpclean")
- # Two pending OTP codes so we verify the WHERE clause hits all rows
- for _ in range(2):
- db_session.add(
- UserOTPCode(
- user_id=user_id,
- code_hash="$pbkdf2-sha256$dummy",
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- pre = await db_session.execute(select(UserOTPCode).where(UserOTPCode.user_id == user_id))
- assert len(pre.scalars().all()) == 2
- resp = await async_client.delete(
- f"/api/v1/users/{user_id}",
- headers={"Authorization": f"Bearer {auth_token}"},
- )
- assert resp.status_code == 204
- await db_session.commit()
- post = await db_session.execute(select(UserOTPCode).where(UserOTPCode.user_id == user_id))
- assert post.scalars().all() == [], "UserOTPCode orphans left behind"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_user_with_all_auth_rows(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- auth_token: str,
- ):
- """Combined: one user with OIDC link + TOTP + OTP + long-lived token — all cleaned up atomically."""
- from backend.app.models.long_lived_token import LongLivedToken
- from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
- from backend.app.models.user_otp_code import UserOTPCode
- from backend.app.models.user_totp import UserTOTP
- user_id = await self._create_user(async_client, auth_token, "fullauth")
- provider = OIDCProvider(
- name="FullAuthProv",
- issuer_url="https://fullauth.example.com",
- client_id="fullauth_client",
- scopes="openid email profile",
- is_enabled=True,
- )
- provider.client_secret = "fullauth_secret"
- db_session.add(provider)
- await db_session.flush()
- db_session.add(
- UserOIDCLink(
- user_id=user_id,
- provider_id=provider.id,
- provider_user_id="sub-fullauth",
- provider_email="full@example.com",
- )
- )
- totp = UserTOTP(user_id=user_id, is_enabled=True)
- totp.secret = "JBSWY3DPEHPK3PXP"
- db_session.add(totp)
- db_session.add(
- UserOTPCode(
- user_id=user_id,
- code_hash="$pbkdf2-sha256$dummy",
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- db_session.add(
- LongLivedToken(
- user_id=user_id,
- name="combined-test",
- lookup_prefix="zz999999",
- secret_hash="$2b$12$dummybcrypthashabcdefghij1234567890",
- scope="camera_stream",
- expires_at=datetime.now(timezone.utc) + timedelta(days=30),
- )
- )
- await db_session.commit()
- resp = await async_client.delete(
- f"/api/v1/users/{user_id}",
- headers={"Authorization": f"Bearer {auth_token}"},
- )
- assert resp.status_code == 204
- await db_session.commit()
- link_post = await db_session.execute(select(UserOIDCLink).where(UserOIDCLink.user_id == user_id))
- totp_post = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user_id))
- otp_post = await db_session.execute(select(UserOTPCode).where(UserOTPCode.user_id == user_id))
- llt_post = await db_session.execute(select(LongLivedToken).where(LongLivedToken.user_id == user_id))
- assert link_post.scalar_one_or_none() is None
- assert totp_post.scalar_one_or_none() is None
- assert otp_post.scalars().all() == []
- assert llt_post.scalar_one_or_none() is None
|