"""Integration tests for 2FA and OIDC API endpoints. Tests the full request/response cycle for: - GET /api/v1/auth/2fa/status - POST /api/v1/auth/2fa/totp/setup - POST /api/v1/auth/2fa/totp/enable - POST /api/v1/auth/2fa/totp/disable - POST /api/v1/auth/2fa/email/enable - POST /api/v1/auth/2fa/email/disable - POST /api/v1/auth/2fa/verify (TOTP, email, backup paths) - DELETE /api/v1/auth/2fa/admin/{user_id} - GET /api/v1/auth/oidc/providers - POST /api/v1/auth/oidc/providers - PATCH /api/v1/auth/oidc/providers/{id} - DELETE /api/v1/auth/oidc/providers/{id} """ from __future__ import annotations import secrets from datetime import datetime, timedelta, timezone import pyotp import pytest from httpx import AsyncClient from passlib.context import CryptContext from sqlalchemy.ext.asyncio import AsyncSession from backend.app.models.auth_ephemeral import AuthEphemeralToken from backend.app.models.user import User _pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") # --------------------------------------------------------------------------- # Fixtures / helpers # --------------------------------------------------------------------------- AUTH_SETUP_URL = "/api/v1/auth/setup" LOGIN_URL = "/api/v1/auth/login" def _norm_pw(password: str) -> str: """Ensure password meets complexity requirements (I4: SetupRequest now validates).""" if not any(c.isupper() for c in password): password = password[0].upper() + password[1:] if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password): password = password + "!" return password async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str: """Enable auth, create an admin user, login, and return the bearer token.""" password = _norm_pw(password) await client.post( AUTH_SETUP_URL, json={ "auth_enabled": True, "admin_username": username, "admin_password": password, }, ) resp = await client.post(LOGIN_URL, json={"username": username, "password": password}) assert resp.status_code == 200 return resp.json()["access_token"] async def _login_get_pre_auth_token(client: AsyncClient, username: str, password: str) -> str: """Login a user who has 2FA enabled; return the pre_auth_token from the response.""" password = _norm_pw(password) resp = await client.post(LOGIN_URL, json={"username": username, "password": password}) assert resp.status_code == 200 data = resp.json() assert data["requires_2fa"] is True, f"Expected requires_2fa=True, got {data}" assert data["pre_auth_token"] is not None return data["pre_auth_token"] def _auth_header(token: str) -> dict[str, str]: return {"Authorization": f"Bearer {token}"} # =========================================================================== # 2FA Status # =========================================================================== class TestTwoFAStatus: """Tests for GET /api/v1/auth/2fa/status.""" @pytest.mark.asyncio @pytest.mark.integration async def test_status_requires_auth(self, async_client: AsyncClient): response = await async_client.get("/api/v1/auth/2fa/status") assert response.status_code == 401 @pytest.mark.asyncio @pytest.mark.integration async def test_status_default_disabled(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "statususer", "statuspass123") response = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token)) assert response.status_code == 200 data = response.json() assert data["totp_enabled"] is False assert data["email_otp_enabled"] is False assert data["backup_codes_remaining"] == 0 # =========================================================================== # TOTP Setup # =========================================================================== class TestTOTPSetup: """Tests for POST /api/v1/auth/2fa/totp/setup.""" @pytest.mark.asyncio @pytest.mark.integration async def test_setup_requires_auth(self, async_client: AsyncClient): response = await async_client.post("/api/v1/auth/2fa/totp/setup") assert response.status_code == 401 @pytest.mark.asyncio @pytest.mark.integration async def test_setup_returns_secret_and_qr(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "totpsetup", "totpsetup123") response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) assert response.status_code == 200 data = response.json() assert "secret" in data assert len(data["secret"]) > 0 assert "qr_code_b64" in data assert data["issuer"] == "Bambuddy" @pytest.mark.asyncio @pytest.mark.integration async def test_setup_secret_is_valid_base32(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "totpbase32", "totpbase32pw") response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) assert response.status_code == 200 secret = response.json()["secret"] # pyotp will raise on invalid base32 totp = pyotp.TOTP(secret) assert len(totp.now()) == 6 # =========================================================================== # TOTP Enable # =========================================================================== class TestTOTPEnable: """Tests for POST /api/v1/auth/2fa/totp/enable.""" @pytest.mark.asyncio @pytest.mark.integration async def test_enable_without_setup_returns_400(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "nosetupenable", "nosetupenable1") response = await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": "123456"}, headers=_auth_header(token), ) assert response.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_enable_with_invalid_code_returns_400(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "badcodeuser", "badcodeuser1") await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) response = await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": "000000"}, headers=_auth_header(token), ) assert response.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_enable_with_valid_code_returns_backup_codes(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "enableok", "enableok123") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() response = await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) assert response.status_code == 200 data = response.json() assert "backup_codes" in data assert len(data["backup_codes"]) == 10 for code in data["backup_codes"]: assert len(code) == 8 @pytest.mark.asyncio @pytest.mark.integration async def test_status_reflects_enabled_totp(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "statustotp", "statustotp1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token)) data = status_resp.json() assert data["totp_enabled"] is True assert data["backup_codes_remaining"] == 10 # =========================================================================== # TOTP Disable # =========================================================================== class TestTOTPDisable: """Tests for POST /api/v1/auth/2fa/totp/disable.""" @pytest.mark.asyncio @pytest.mark.integration async def test_disable_when_not_enabled_returns_400(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "disablenoenab", "disablenoenab1") response = await async_client.post( "/api/v1/auth/2fa/totp/disable", json={"code": "123456"}, headers=_auth_header(token), ) assert response.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_disable_with_valid_code(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "disableok", "disableok123") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) # Disable with a fresh valid code disable_code = pyotp.TOTP(secret).now() response = await async_client.post( "/api/v1/auth/2fa/totp/disable", json={"code": disable_code}, headers=_auth_header(token), ) assert response.status_code == 200 assert "disabled" in response.json()["message"].lower() # Status should now show disabled status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token)) assert status_resp.json()["totp_enabled"] is False # =========================================================================== # Email OTP Enable/Disable # =========================================================================== class TestEmailOTP: """Tests for POST /api/v1/auth/2fa/email/enable, /enable/confirm and /disable.""" @pytest.mark.asyncio @pytest.mark.integration async def test_enable_email_otp_without_email_returns_400(self, async_client: AsyncClient): """Users without an email address cannot enable email OTP.""" token = await _setup_and_login(async_client, "noemailuser", "noemailuser1") response = await async_client.post("/api/v1/auth/2fa/email/enable", headers=_auth_header(token)) assert response.status_code == 400 assert "email" in response.json()["detail"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_confirm_enable_email_otp_happy_path(self, async_client: AsyncClient, db_session: AsyncSession): """Confirm step activates email OTP when setup_token + code are valid (C5).""" token = await _setup_and_login(async_client, "confirmenable", "confirmenable1") # Give user an email address directly (SMTP not available in tests) from sqlalchemy import select as sa_select result = await db_session.execute(sa_select(User).where(User.username == "confirmenable")) user = result.scalar_one() user.email = "confirmenable@example.com" await db_session.commit() # Inject a known setup token directly into the DB (bypasses SMTP) code = "123456" code_hash = _pwd_context.hash(code) setup_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=setup_token, token_type="email_otp_setup", username="confirmenable", nonce=code_hash, expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() resp = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": code}, headers=_auth_header(token), ) assert resp.status_code == 200 status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token)) assert status_resp.json()["email_otp_enabled"] is True @pytest.mark.asyncio @pytest.mark.integration async def test_confirm_enable_email_otp_wrong_code(self, async_client: AsyncClient, db_session: AsyncSession): """Wrong code on confirm step returns 400 and does not enable email OTP.""" token = await _setup_and_login(async_client, "confirmwrong", "confirmwrong1") code_hash = _pwd_context.hash("654321") setup_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=setup_token, token_type="email_otp_setup", username="confirmwrong", nonce=code_hash, expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() resp = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": "000000"}, headers=_auth_header(token), ) assert resp.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_confirm_enable_email_otp_setup_token_is_single_use( self, async_client: AsyncClient, db_session: AsyncSession ): """Setup token is consumed on first use; replay returns 400.""" token = await _setup_and_login(async_client, "confirmonce", "confirmonce1") code = "111111" code_hash = _pwd_context.hash(code) setup_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=setup_token, token_type="email_otp_setup", username="confirmonce", nonce=code_hash, expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() first = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": code}, headers=_auth_header(token), ) assert first.status_code == 200 second = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": code}, headers=_auth_header(token), ) assert second.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_disable_email_otp_requires_password(self, async_client: AsyncClient): """Disabling email OTP requires the account password (C6: re-auth).""" token = await _setup_and_login(async_client, "disemailotp", "disemailotp1") # Wrong password → 401 response = await async_client.post( "/api/v1/auth/2fa/email/disable", json={"password": "wrongpassword"}, headers=_auth_header(token), ) assert response.status_code == 401 @pytest.mark.asyncio @pytest.mark.integration async def test_disable_email_otp_when_enabled(self, async_client: AsyncClient, db_session: AsyncSession): """Disabling email OTP when enabled turns it off and status reflects that.""" token = await _setup_and_login(async_client, "disemailpw", "disemailpw1") # Enable email OTP via direct DB injection (no SMTP) code = "222222" setup_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=setup_token, token_type="email_otp_setup", username="disemailpw", nonce=_pwd_context.hash(code), expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": code}, headers=_auth_header(token), ) # Now disable response = await async_client.post( "/api/v1/auth/2fa/email/disable", json={"password": _norm_pw("disemailpw1")}, headers=_auth_header(token), ) assert response.status_code == 200 status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token)) assert status_resp.json()["email_otp_enabled"] is False # =========================================================================== # 2FA Verify — TOTP path # =========================================================================== class TestTwoFAVerifyTOTP: """Tests for POST /api/v1/auth/2fa/verify using the TOTP method.""" @pytest.mark.asyncio @pytest.mark.integration async def test_verify_with_invalid_pre_auth_token(self, async_client: AsyncClient): response = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": "bogus", "method": "totp", "code": "123456"}, ) assert response.status_code == 401 @pytest.mark.asyncio @pytest.mark.integration async def test_verify_totp_issues_jwt(self, async_client: AsyncClient): """Full flow: setup → enable TOTP → login → pre_auth_token → verify → JWT.""" token = await _setup_and_login(async_client, "verifytotpok", "verifytotpok1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) # Login now returns requires_2fa=True + pre_auth_token pre_auth_token = await _login_get_pre_auth_token(async_client, "verifytotpok", "verifytotpok1") verify_resp = await async_client.post( "/api/v1/auth/2fa/verify", json={ "pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now(), }, ) assert verify_resp.status_code == 200 data = verify_resp.json() assert "access_token" in data assert data["token_type"] == "bearer" assert data["user"]["username"] == "verifytotpok" @pytest.mark.asyncio @pytest.mark.integration async def test_verify_totp_invalid_code(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "verifybadcode", "verifybadcode1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) pre_auth_token = await _login_get_pre_auth_token(async_client, "verifybadcode", "verifybadcode1") verify_resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"}, ) assert verify_resp.status_code == 401 @pytest.mark.asyncio @pytest.mark.integration async def test_verify_invalid_method(self, async_client: AsyncClient): """An invalid 2FA method should return 400 even with a valid pre_auth_token.""" token = await _setup_and_login(async_client, "invalidmethod", "invalidmethod1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) pre_auth_token = await _login_get_pre_auth_token(async_client, "invalidmethod", "invalidmethod1") response = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "sms", "code": "123456"}, ) assert response.status_code == 422 # Pydantic Literal validation # =========================================================================== # 2FA Verify — Backup code path # =========================================================================== class TestTwoFAVerifyBackup: """Tests for POST /api/v1/auth/2fa/verify using the backup method.""" @pytest.mark.asyncio @pytest.mark.integration async def test_verify_with_backup_code(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "backupcodeok", "backupcodeok1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() enable_resp = await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) backup_code = enable_resp.json()["backup_codes"][0] pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcodeok", "backupcodeok1") verify_resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code}, ) assert verify_resp.status_code == 200 assert "access_token" in verify_resp.json() @pytest.mark.asyncio @pytest.mark.integration async def test_backup_code_is_single_use(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "backupsingle", "backupsingle1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() enable_resp = await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) backup_code = enable_resp.json()["backup_codes"][0] # First use — should succeed pre_auth_token = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1") first_resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code}, ) assert first_resp.status_code == 200 # Second use of the same code — must fail (need new pre_auth_token + same backup code) pre_auth_token2 = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1") second_resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token2, "method": "backup", "code": backup_code}, ) assert second_resp.status_code == 401 @pytest.mark.asyncio @pytest.mark.integration async def test_backup_code_count_decrements(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "backupcount", "backupcount1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() enable_resp = await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) backup_code = enable_resp.json()["backup_codes"][0] pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcount", "backupcount1") await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code}, ) # Status is readable with the original full token (still valid) status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token)) assert status_resp.json()["backup_codes_remaining"] == 9 # =========================================================================== # Rate Limiting # =========================================================================== class TestRateLimiting: """Ensure 429 is returned after 5 failed 2FA attempts.""" @pytest.mark.asyncio @pytest.mark.integration async def test_rate_limit_lockout(self, async_client: AsyncClient): """After 5 failed TOTP attempts the 6th must return 429.""" token = await _setup_and_login(async_client, "ratelimituser", "ratelimituser1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) # 5 failed attempts via the login → pre_auth_token → verify flow for _ in range(5): pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1") await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"}, ) # 6th attempt should hit the rate limit pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1") response = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"}, ) assert response.status_code == 429 # =========================================================================== # Admin 2FA Disable # =========================================================================== class TestAdminDisable2FA: """Tests for DELETE /api/v1/auth/2fa/admin/{user_id}.""" @pytest.mark.asyncio @pytest.mark.integration async def test_admin_disable_requires_admin(self, async_client: AsyncClient): """Only admins can use the admin disable endpoint.""" # The only user in a fresh setup IS admin, so just check the 404 path token = await _setup_and_login(async_client, "admincheck", "admincheck123") # Try to disable for a non-existent user_id — should get 200 (no-op) or 404 response = await async_client.request( "DELETE", "/api/v1/auth/2fa/admin/99999", json={"admin_password": _norm_pw("admincheck123")}, headers=_auth_header(token), ) # Admin users succeed regardless (returns 200 even if user doesn't exist) assert response.status_code == 200 @pytest.mark.asyncio @pytest.mark.integration async def test_admin_disable_clears_totp(self, async_client: AsyncClient): from sqlalchemy import select from backend.app.models.user import User token = await _setup_and_login(async_client, "admintotp", "admintotp123") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) # Find the user's id by querying status (which works with the token) me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token)) user_id = me_resp.json()["id"] response = await async_client.request( "DELETE", f"/api/v1/auth/2fa/admin/{user_id}", json={"admin_password": _norm_pw("admintotp123")}, headers=_auth_header(token), ) assert response.status_code == 200 # I2: admin_disable_2fa bumps password_changed_at, invalidating the old token. # Re-login to get a fresh token before checking status. new_login = await async_client.post( LOGIN_URL, json={"username": "admintotp", "password": _norm_pw("admintotp123")} ) assert new_login.status_code == 200, f"re-login failed: {new_login.json()}" assert new_login.json().get("requires_2fa") is False, f"still requires 2FA: {new_login.json()}" new_token = new_login.json()["access_token"] assert new_token is not None, f"no access_token in: {new_login.json()}" # Status should now show TOTP disabled status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(new_token)) assert status_resp.status_code == 200, f"status check failed: {status_resp.json()}" assert status_resp.json()["totp_enabled"] is False # =========================================================================== # OIDC Provider CRUD # =========================================================================== class TestOIDCProviders: """Tests for OIDC provider management endpoints.""" @pytest.mark.asyncio @pytest.mark.integration async def test_list_public_providers_empty(self, async_client: AsyncClient): response = await async_client.get("/api/v1/auth/oidc/providers") assert response.status_code == 200 assert isinstance(response.json(), list) @pytest.mark.asyncio @pytest.mark.integration async def test_create_provider_requires_admin(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "oidcadmincreate", "oidcadmincreate1") response = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "PocketID", "issuer_url": "https://auth.example.com", "client_id": "bambuddy", "client_secret": "supersecret", "scopes": "openid email profile", "is_enabled": True, "auto_create_users": False, }, headers=_auth_header(token), ) assert response.status_code == 201 data = response.json() assert data["name"] == "PocketID" assert data["issuer_url"] == "https://auth.example.com" assert "client_secret" not in data # Secret must not be returned @pytest.mark.asyncio @pytest.mark.integration async def test_created_provider_appears_in_all_list(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "oidclistall", "oidclistall123") await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "TestProvider", "issuer_url": "https://test.example.com", "client_id": "testclient", "client_secret": "testsecret", "scopes": "openid", "is_enabled": True, "auto_create_users": False, }, headers=_auth_header(token), ) response = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token)) assert response.status_code == 200 names = [p["name"] for p in response.json()] assert "TestProvider" in names @pytest.mark.asyncio @pytest.mark.integration async def test_disabled_provider_not_in_public_list(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "oidcdisabled", "oidcdisabled1") await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "DisabledProvider", "issuer_url": "https://disabled.example.com", "client_id": "dc", "client_secret": "ds", "scopes": "openid", "is_enabled": False, "auto_create_users": False, }, headers=_auth_header(token), ) response = await async_client.get("/api/v1/auth/oidc/providers") names = [p["name"] for p in response.json()] assert "DisabledProvider" not in names @pytest.mark.asyncio @pytest.mark.integration async def test_update_provider(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "oidcupdate", "oidcupdate123") create_resp = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "OldName", "issuer_url": "https://update.example.com", "client_id": "uc", "client_secret": "us", "scopes": "openid", "is_enabled": True, "auto_create_users": False, }, headers=_auth_header(token), ) provider_id = create_resp.json()["id"] put_resp = await async_client.put( f"/api/v1/auth/oidc/providers/{provider_id}", json={"name": "NewName"}, headers=_auth_header(token), ) assert put_resp.status_code == 200 assert put_resp.json()["name"] == "NewName" @pytest.mark.asyncio @pytest.mark.integration async def test_delete_provider(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "oidcdelete", "oidcdelete123") create_resp = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "ToDelete", "issuer_url": "https://delete.example.com", "client_id": "dc", "client_secret": "ds", "scopes": "openid", "is_enabled": True, "auto_create_users": False, }, headers=_auth_header(token), ) provider_id = create_resp.json()["id"] del_resp = await async_client.delete( f"/api/v1/auth/oidc/providers/{provider_id}", headers=_auth_header(token), ) assert del_resp.status_code == 200 # No longer in list all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token)) ids = [p["id"] for p in all_resp.json()] assert provider_id not in ids @pytest.mark.asyncio @pytest.mark.integration async def test_update_nonexistent_provider_returns_404(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "oidc404", "oidc404pass1") response = await async_client.put( "/api/v1/auth/oidc/providers/99999", json={"name": "ghost"}, headers=_auth_header(token), ) assert response.status_code == 404 # =========================================================================== # Security: pre-auth token single-use # =========================================================================== class TestPreAuthTokenSingleUse: """pre_auth_token must be consumed on successful 2FA and cannot be reused.""" @pytest.mark.asyncio @pytest.mark.integration async def test_pre_auth_token_is_single_use(self, async_client: AsyncClient): """A pre_auth_token that was successfully used cannot be reused.""" token = await _setup_and_login(async_client, "singleusepat", "singleusepat1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) pre_auth_token = await _login_get_pre_auth_token(async_client, "singleusepat", "singleusepat1") # First use — succeeds first = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()}, ) assert first.status_code == 200 # Second use of the same token — must fail (token already consumed on success) second = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()}, ) assert second.status_code == 401 @pytest.mark.asyncio @pytest.mark.integration async def test_pre_auth_token_survives_wrong_code(self, async_client: AsyncClient): """A wrong 2FA code must NOT burn the pre_auth_token (user can retry).""" token = await _setup_and_login(async_client, "survivepatuser", "survivepatuser1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] valid_code = pyotp.TOTP(secret).now() await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": valid_code}, headers=_auth_header(token), ) pre_auth_token = await _login_get_pre_auth_token(async_client, "survivepatuser", "survivepatuser1") # Wrong code — should fail but not burn the token bad = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"}, ) assert bad.status_code == 401 # Same token, correct code — should succeed (token still valid) good = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()}, ) assert good.status_code == 200 # =========================================================================== # Security: cross-user token isolation # =========================================================================== class TestCrossUserTokenIsolation: """A pre_auth_token issued for user A cannot authenticate as user B.""" @pytest.mark.asyncio @pytest.mark.integration async def test_token_cannot_be_used_for_different_user(self, async_client: AsyncClient): """pre_auth_token is bound to the issuing user; using it to verify a different user's TOTP code must fail.""" # Set up two users with TOTP token_a = await _setup_and_login(async_client, "crossusera", "crossusera1") setup_a = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token_a)) secret_a = setup_a.json()["secret"] await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": pyotp.TOTP(secret_a).now()}, headers=_auth_header(token_a), ) # Get pre_auth_token for user A pre_auth_a = await _login_get_pre_auth_token(async_client, "crossusera", "crossusera1") # Try to use user A's token but supply a clearly invalid code — must fail resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_a, "method": "totp", "code": "000000"}, ) assert resp.status_code == 401 # =========================================================================== # Security: admin disable non-admin rejection # =========================================================================== class TestAdminDisableNonAdminRejection: """Non-admin users must be rejected from the admin disable endpoint.""" @pytest.mark.asyncio @pytest.mark.integration async def test_non_admin_cannot_disable_2fa(self, async_client: AsyncClient): """A regular (non-admin) user must receive 403 from DELETE /auth/2fa/admin/{id}.""" # Set up admin, then create a regular user admin_token = await _setup_and_login(async_client, "adminusr2fa", "adminusr2fa1") # Create a regular user via user management create_resp = await async_client.post( "/api/v1/users", json={"username": "regularusr2fa", "password": "Regularusr2fa1!"}, headers=_auth_header(admin_token), ) assert create_resp.status_code == 201 # Login as regular user login_resp = await async_client.post( LOGIN_URL, json={"username": "regularusr2fa", "password": "Regularusr2fa1!"}, ) regular_token = login_resp.json()["access_token"] # Try to call admin endpoint with the regular user's token resp = await async_client.delete( f"/api/v1/auth/2fa/admin/{create_resp.json()['id']}", headers=_auth_header(regular_token), ) assert resp.status_code == 403 # =========================================================================== # Regenerate backup codes # =========================================================================== class TestRegenerateBackupCodes: """Tests for POST /api/v1/auth/2fa/totp/regenerate-backup-codes.""" @pytest.mark.asyncio @pytest.mark.integration async def test_regenerate_requires_totp_enabled(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "regennototp", "regennototp1") resp = await async_client.post( "/api/v1/auth/2fa/totp/regenerate-backup-codes", json={"code": "123456"}, headers=_auth_header(token), ) assert resp.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_regenerate_invalidates_old_codes(self, async_client: AsyncClient): """After regenerating, old backup codes must no longer work.""" token = await _setup_and_login(async_client, "regeninval", "regeninval1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] enable_resp = await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": pyotp.TOTP(secret).now()}, headers=_auth_header(token), ) old_backup = enable_resp.json()["backup_codes"][0] # Regenerate backup codes regen_resp = await async_client.post( "/api/v1/auth/2fa/totp/regenerate-backup-codes", json={"code": pyotp.TOTP(secret).now()}, headers=_auth_header(token), ) assert regen_resp.status_code == 200 new_codes = regen_resp.json()["backup_codes"] assert len(new_codes) == 10 assert old_backup not in new_codes # Old backup code must now fail pre_auth_token = await _login_get_pre_auth_token(async_client, "regeninval", "regeninval1") fail_resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth_token, "method": "backup", "code": old_backup}, ) assert fail_resp.status_code == 401 @pytest.mark.asyncio @pytest.mark.integration async def test_regenerate_with_invalid_code_fails(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "regeninvcode", "regeninvcode1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": pyotp.TOTP(secret).now()}, headers=_auth_header(token), ) resp = await async_client.post( "/api/v1/auth/2fa/totp/regenerate-backup-codes", json={"code": "000000"}, headers=_auth_header(token), ) assert resp.status_code == 400 # =========================================================================== # Security: method field validation # =========================================================================== class TestVerifyMethodValidation: """The method field must be one of totp/email/backup (Pydantic Literal).""" @pytest.mark.asyncio @pytest.mark.integration async def test_invalid_method_rejected_by_schema(self, async_client: AsyncClient): """Pydantic should reject unknown method values with 422.""" resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": "anytoken", "code": "123456", "method": "sms"}, ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_oversized_pre_auth_token_rejected(self, async_client: AsyncClient): """pre_auth_token exceeding max_length=128 should be rejected with 422.""" resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": "x" * 200, "code": "123456", "method": "totp"}, ) assert resp.status_code == 422 # =========================================================================== # Login response shape for 2FA users # =========================================================================== class TestLoginResponseShape: """Login for a 2FA-enabled user must return requires_2fa+pre_auth_token and must NOT include access_token (which would bypass the 2FA gate).""" @pytest.mark.asyncio @pytest.mark.integration async def test_login_2fa_user_omits_access_token(self, async_client: AsyncClient): """A user with TOTP enabled must not receive an access_token on /auth/login.""" token = await _setup_and_login(async_client, "loginshape", "loginshape1") setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": pyotp.TOTP(secret).now()}, headers=_auth_header(token), ) login_resp = await async_client.post(LOGIN_URL, json={"username": "loginshape", "password": "Loginshape1!"}) assert login_resp.status_code == 200 data = login_resp.json() assert data.get("requires_2fa") is True assert data.get("pre_auth_token") is not None # access_token must NOT be present — it would bypass the 2FA gate assert "access_token" not in data or data["access_token"] is None # =========================================================================== # TOTP replay protection # =========================================================================== async def _setup_totp_user(client: AsyncClient, username: str, password: str) -> tuple[str, str]: """Create user, set up and enable TOTP; return (bearer_token, totp_secret).""" token = await _setup_and_login(client, username, password) setup_resp = await client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] await client.post( "/api/v1/auth/2fa/totp/enable", json={"code": pyotp.TOTP(secret).now()}, headers=_auth_header(token), ) return token, secret class TestTOTPReplay: """The same TOTP code must not be accepted twice within one 30-second window.""" @pytest.mark.asyncio @pytest.mark.integration async def test_totp_replay_rejected_on_verify(self, async_client: AsyncClient): """Replaying the same code on /2fa/verify must return 400.""" _token, secret = await _setup_totp_user(async_client, "replayverify", "replayverify1") code = pyotp.TOTP(secret).now() pre_auth = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1") first = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth, "method": "totp", "code": code}, ) assert first.status_code == 200 # Second login to get a fresh pre_auth_token (first was consumed) pre_auth2 = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1") second = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth2, "method": "totp", "code": code}, ) assert second.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_totp_replay_rejected_on_disable(self, async_client: AsyncClient): """A code already used in verify_2fa must be rejected on /2fa/totp/disable.""" _setup_token, secret = await _setup_totp_user(async_client, "replaydisable", "replaydisable1") code = pyotp.TOTP(secret).now() # Use the code in verify_2fa — this sets last_totp_counter in DB pre_auth = await _login_get_pre_auth_token(async_client, "replaydisable", "replaydisable1") verify_resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": pre_auth, "method": "totp", "code": code}, ) assert verify_resp.status_code == 200 authed_token = verify_resp.json()["access_token"] # Replay the same code on disable — must be rejected (same 30-second window) disable_resp = await async_client.post( "/api/v1/auth/2fa/totp/disable", json={"code": code}, headers=_auth_header(authed_token), ) assert disable_resp.status_code == 400 # =========================================================================== # Rate limiting on disable_totp and regenerate_backup_codes (I10) # =========================================================================== class TestRateLimitingDisableRegenerate: """disable_totp and regenerate_backup_codes must enforce rate limiting (I10).""" @pytest.mark.asyncio @pytest.mark.integration async def test_disable_totp_rate_limited_after_failures(self, async_client: AsyncClient): """Repeated wrong codes on /2fa/totp/disable trigger 429.""" token, _secret = await _setup_totp_user(async_client, "rldisable", "rldisable1") for _ in range(5): await async_client.post( "/api/v1/auth/2fa/totp/disable", json={"code": "000000"}, headers=_auth_header(token), ) resp = await async_client.post( "/api/v1/auth/2fa/totp/disable", json={"code": "000000"}, headers=_auth_header(token), ) assert resp.status_code == 429 @pytest.mark.asyncio @pytest.mark.integration async def test_regenerate_backup_codes_rate_limited_after_failures(self, async_client: AsyncClient): """Repeated wrong codes on /2fa/totp/regenerate-backup-codes trigger 429.""" token, _secret = await _setup_totp_user(async_client, "rlregen", "rlregen1") for _ in range(5): await async_client.post( "/api/v1/auth/2fa/totp/regenerate-backup-codes", json={"code": "000000"}, headers=_auth_header(token), ) resp = await async_client.post( "/api/v1/auth/2fa/totp/regenerate-backup-codes", json={"code": "000000"}, headers=_auth_header(token), ) assert resp.status_code == 429 # =========================================================================== # Email OTP send → verify end-to-end (coverage gap C3) # =========================================================================== class TestEmailOTPSendVerify: """Full email OTP login: send code → verify code → JWT.""" @pytest.mark.asyncio @pytest.mark.integration async def test_email_otp_send_and_verify(self, async_client: AsyncClient, db_session: AsyncSession): """login → POST /2fa/email/send (patched SMTP) → POST /2fa/verify → JWT.""" import re from unittest.mock import AsyncMock, MagicMock, patch from sqlalchemy import select as sa_select token = await _setup_and_login(async_client, "emailsendok", "emailsendok1") # Give the user an email address result = await db_session.execute(sa_select(User).where(User.username == "emailsendok")) user = result.scalar_one() user.email = "emailsendok@example.com" await db_session.commit() # Enable email OTP via DB injection setup_code = "444444" setup_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=setup_token, token_type="email_otp_setup", username="emailsendok", nonce=_pwd_context.hash(setup_code), expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": setup_code}, headers=_auth_header(token), ) # Login now requires 2FA — get pre_auth_token (cookie set automatically) pre_auth_token = await _login_get_pre_auth_token(async_client, "emailsendok", "emailsendok1") # Mock SMTP and capture the sent OTP code captured: dict[str, str] = {} smtp_settings_mock = MagicMock() def _capture_email(smtp_settings, to_email, subject, body_text, body_html): m = re.search(r"login code is: (\d{6})", body_text) if m: captured["otp"] = m.group(1) with ( patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_settings_mock)), patch("backend.app.api.routes.mfa.send_email", side_effect=_capture_email), ): send_resp = await async_client.post( "/api/v1/auth/2fa/email/send", json={"pre_auth_token": pre_auth_token}, ) assert send_resp.status_code == 200, send_resp.text fresh_token = send_resp.json()["pre_auth_token"] assert "otp" in captured, "send_email was not called or code not found in body" # Verify with the captured OTP code — cookie still in the async_client jar verify_resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]}, ) assert verify_resp.status_code == 200 data = verify_resp.json() assert "access_token" in data assert data["user"]["username"] == "emailsendok" @pytest.mark.asyncio @pytest.mark.integration async def test_email_otp_wrong_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession): """A wrong email OTP code must return 401 without burning the pre_auth_token.""" from unittest.mock import AsyncMock, MagicMock, patch from sqlalchemy import select as sa_select token = await _setup_and_login(async_client, "emailwrongcode", "emailwrongcode1") result = await db_session.execute(sa_select(User).where(User.username == "emailwrongcode")) user = result.scalar_one() user.email = "emailwrongcode@example.com" await db_session.commit() setup_code = "555555" setup_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=setup_token, token_type="email_otp_setup", username="emailwrongcode", nonce=_pwd_context.hash(setup_code), expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": setup_code}, headers=_auth_header(token), ) pre_auth_token = await _login_get_pre_auth_token(async_client, "emailwrongcode", "emailwrongcode1") captured: dict[str, str] = {} smtp_mock = MagicMock() def _capture(smtp_settings, to_email, subject, body_text, body_html): import re m = re.search(r"login code is: (\d{6})", body_text) if m: captured["otp"] = m.group(1) with ( patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_mock)), patch("backend.app.api.routes.mfa.send_email", side_effect=_capture), ): send_resp = await async_client.post( "/api/v1/auth/2fa/email/send", json={"pre_auth_token": pre_auth_token}, ) assert send_resp.status_code == 200 fresh_token = send_resp.json()["pre_auth_token"] # Wrong code → 401 bad = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": fresh_token, "method": "email", "code": "000000"}, ) assert bad.status_code == 401 # Correct code still works (token not burned by wrong attempt) good = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]}, ) assert good.status_code == 200 # =========================================================================== # OIDC end-to-end (coverage gap C4) # =========================================================================== def _make_test_rsa_key(): """Generate a throwaway RSA key pair and a matching JWK set for tests.""" import base64 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) private_pem = private_key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption(), ) pub_numbers = private_key.public_key().public_numbers() def _b64url(n: int, length: int) -> str: return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode() jwks = { "keys": [ { "kty": "RSA", "use": "sig", "alg": "RS256", "kid": "test-kid-1", "n": _b64url(pub_numbers.n, 256), "e": _b64url(pub_numbers.e, 3), } ] } return private_pem, jwks class TestOIDCEndToEnd: """Full OIDC auth-code flow: state → callback (mocked IdP) → exchange → JWT.""" @pytest.mark.asyncio @pytest.mark.integration async def test_oidc_callback_creates_user_and_issues_jwt(self, async_client: AsyncClient, db_session: AsyncSession): """callback validates the mocked ID token, creates a user, and redirects with an oidc_exchange token; exchanging that token returns a full JWT.""" import time from unittest.mock import patch import jwt as pyjwt private_pem, jwks_data = _make_test_rsa_key() issuer = "https://idp.test.example.com" client_id = "oidc-test-client" nonce = secrets.token_urlsafe(16) now = int(time.time()) id_token = pyjwt.encode( { "sub": "oidc-sub-e2e", "iss": issuer, "aud": client_id, "nonce": nonce, "email": "oidce2e@example.com", "email_verified": True, "iat": now, "exp": now + 300, }, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"}, ) # Create OIDC provider admin_token = await _setup_and_login(async_client, "oidce2eadm", "oidce2eadm1") create_resp = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "E2E-IdP", "issuer_url": issuer, "client_id": client_id, "client_secret": "test-secret", "scopes": "openid email profile", "is_enabled": True, "auto_create_users": True, }, headers=_auth_header(admin_token), ) assert create_resp.status_code == 201 provider_id = create_resp.json()["id"] # Simulate the authorize step: insert an oidc_state token directly state = secrets.token_urlsafe(32) code_verifier = secrets.token_urlsafe(48) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=provider_id, nonce=nonce, code_verifier=code_verifier, expires_at=datetime.now(timezone.utc) + timedelta(minutes=5), ) ) await db_session.commit() # Mock httpx calls made inside oidc_callback discovery_doc = { "issuer": issuer, "authorization_endpoint": f"{issuer}/auth", "token_endpoint": f"{issuer}/token", "jwks_uri": f"{issuer}/.well-known/jwks.json", } token_response = { "access_token": "mock-access", "token_type": "Bearer", "id_token": id_token, } class _MockResp: def __init__(self, data): self._data = data self.status_code = 200 self.is_success = True self.text = str(data) def json(self): return self._data def raise_for_status(self): pass class _MockHttpxClient: def __init__(self, *args, **kwargs): pass async def __aenter__(self): return self async def __aexit__(self, *args): pass async def get(self, url, **kwargs): if "jwks" in url: return _MockResp(jwks_data) return _MockResp(discovery_doc) async def post(self, url, **kwargs): return _MockResp(token_response) with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient): callback_resp = await async_client.get( f"/api/v1/auth/oidc/callback?code=test-auth-code&state={state}", follow_redirects=False, ) assert callback_resp.status_code == 302, callback_resp.text location = callback_resp.headers.get("location", "") assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}" # Extract and exchange the oidc_exchange token oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0] exchange_resp = await async_client.post( "/api/v1/auth/oidc/exchange", json={"oidc_token": oidc_exchange_token}, ) assert exchange_resp.status_code == 200 data = exchange_resp.json() assert "access_token" in data assert data["user"]["username"] is not None @pytest.mark.asyncio @pytest.mark.integration async def test_oidc_callback_invalid_state_redirects_error(self, async_client: AsyncClient): """An unknown state token must redirect to /?oidc_error=invalid_state.""" resp = await async_client.get( "/api/v1/auth/oidc/callback?code=x&state=totally-bogus-state", follow_redirects=False, ) assert resp.status_code == 302 assert "invalid_state" in resp.headers.get("location", "") @pytest.mark.asyncio @pytest.mark.integration async def test_oidc_state_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession): """Replaying the same state token must fail on the second callback.""" import time from unittest.mock import patch import jwt as pyjwt private_pem, jwks_data = _make_test_rsa_key() issuer = "https://idp2.test.example.com" client_id = "oidc-client-2" nonce = secrets.token_urlsafe(16) now = int(time.time()) id_token = pyjwt.encode( { "sub": "sub-single-use", "iss": issuer, "aud": client_id, "nonce": nonce, "email": "su@example.com", "email_verified": True, "iat": now, "exp": now + 300, }, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"}, ) admin_token = await _setup_and_login(async_client, "oidcsuadm", "oidcsuadm1") cr = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "SU-IdP", "issuer_url": issuer, "client_id": client_id, "client_secret": "s", "scopes": "openid", "is_enabled": True, "auto_create_users": True, }, headers=_auth_header(admin_token), ) provider_id = cr.json()["id"] state = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=provider_id, nonce=nonce, code_verifier=secrets.token_urlsafe(48), expires_at=datetime.now(timezone.utc) + timedelta(minutes=5), ) ) await db_session.commit() discovery_doc = { "issuer": issuer, "authorization_endpoint": f"{issuer}/auth", "token_endpoint": f"{issuer}/token", "jwks_uri": f"{issuer}/.well-known/jwks.json", } token_response = {"access_token": "a", "token_type": "Bearer", "id_token": id_token} class _MockResp: def __init__(self, data): self._data = data self.status_code = 200 self.is_success = True self.text = str(data) def json(self): return self._data def raise_for_status(self): pass class _MockHttpxClient: def __init__(self, *a, **kw): pass async def __aenter__(self): return self async def __aexit__(self, *a): pass async def get(self, url, **kw): return _MockResp(jwks_data if "jwks" in url else discovery_doc) async def post(self, url, **kw): return _MockResp(token_response) with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient): first = await async_client.get( f"/api/v1/auth/oidc/callback?code=c&state={state}", follow_redirects=False, ) assert first.status_code == 302 assert "oidc_token=" in first.headers.get("location", "") # Replay: second callback with the same state must fail second = await async_client.get( f"/api/v1/auth/oidc/callback?code=c&state={state}", follow_redirects=False, ) assert second.status_code == 302 assert "invalid_state" in second.headers.get("location", "") # =========================================================================== # H-2: Wrong code must NOT consume the email OTP setup token (peek-then-consume) # =========================================================================== class TestEmailOTPSetupTokenPreservedOnWrongCode: """After H-2 fix: a wrong code leaves the setup token intact so the user can retry.""" @pytest.mark.asyncio @pytest.mark.integration async def test_wrong_code_does_not_consume_setup_token(self, async_client: AsyncClient, db_session: AsyncSession): """Wrong code returns 400 but the setup token survives; correct code then works.""" token = await _setup_and_login(async_client, "h2retryuser", "h2retrypass1") code = "999999" code_hash = _pwd_context.hash(code) setup_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=setup_token, token_type="email_otp_setup", username="h2retryuser", nonce=code_hash, expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() # First attempt: wrong code → 400 wrong = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": "000000"}, headers=_auth_header(token), ) assert wrong.status_code == 400 # Second attempt: correct code → must succeed (token was NOT consumed) correct = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": setup_token, "code": code}, headers=_auth_header(token), ) assert correct.status_code == 200 # =========================================================================== # M-2: New OIDC provider must default to auto_link_existing_accounts=False # =========================================================================== class TestOIDCProviderAutoLinkDefault: """auto_link_existing_accounts must default to False (M-2 fix).""" @pytest.mark.asyncio @pytest.mark.integration async def test_new_provider_auto_link_defaults_to_false(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "m2autolinkadmin", "m2autolinkadmin1") resp = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "AutoLinkTest", "issuer_url": "https://autolink.example.com", "client_id": "alc", "client_secret": "als", "scopes": "openid", "is_enabled": True, "auto_create_users": False, # auto_link_existing_accounts intentionally omitted }, headers=_auth_header(token), ) assert resp.status_code == 201 assert resp.json()["auto_link_existing_accounts"] is False # =========================================================================== # L-5: 2FA verify code format validation # =========================================================================== class TestTwoFAVerifyCodeFormat: """TwoFAVerifyRequest.code must be 6–8 alphanumeric characters (L-5).""" @pytest.mark.asyncio @pytest.mark.integration async def test_code_too_long_rejected(self, async_client: AsyncClient): """code > 8 characters must be rejected with 422.""" resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": "anytoken", "code": "1" * 9, "method": "totp"}, ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_code_non_alphanumeric_rejected(self, async_client: AsyncClient): """code containing non-alphanumeric chars must be rejected with 422.""" resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": "anytoken", "code": "12-456", "method": "totp"}, ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_code_too_short_rejected(self, async_client: AsyncClient): """code < 6 characters must be rejected with 422.""" resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": "anytoken", "code": "12345", "method": "totp"}, ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_code_exactly_6_passes_schema(self, async_client: AsyncClient): """6-character alphanumeric code passes schema (may fail 2FA logic with 400).""" resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": "x" * 32, "code": "123456", "method": "totp"}, ) assert resp.status_code != 422 @pytest.mark.asyncio @pytest.mark.integration async def test_code_exactly_8_passes_schema(self, async_client: AsyncClient): """8-character alphanumeric backup code passes schema.""" resp = await async_client.post( "/api/v1/auth/2fa/verify", json={"pre_auth_token": "x" * 32, "code": "ABCD1234", "method": "backup"}, ) assert resp.status_code != 422 # =========================================================================== # M-NEW-1: verify_slicer_download_token must NOT consume token on wrong resource # =========================================================================== class TestSlicerTokenResourceBinding: """Token for resource A must survive a wrong-resource check and still work for A.""" @pytest.mark.asyncio @pytest.mark.integration async def test_wrong_resource_does_not_consume_token(self, async_client: AsyncClient, db_session: AsyncSession): """A slicer token bound to archive:5 must NOT be consumed when checked against archive:6.""" from datetime import datetime, timedelta, timezone from backend.app.core.auth import verify_slicer_download_token from backend.app.models.auth_ephemeral import AuthEphemeralToken now = datetime.now(timezone.utc) token_val = secrets.token_urlsafe(24) db_session.add( AuthEphemeralToken( token=token_val, token_type="slicer_download", nonce="archive:5", expires_at=now + timedelta(minutes=5), ) ) await db_session.commit() # Wrong resource → must return False and NOT consume the token wrong = await verify_slicer_download_token(token_val, "archive", 6) assert wrong is False # Correct resource → must return True (token survived the wrong-resource check) correct = await verify_slicer_download_token(token_val, "archive", 5) assert correct is True @pytest.mark.asyncio @pytest.mark.integration async def test_correct_resource_consumes_token(self, async_client: AsyncClient, db_session: AsyncSession): """A slicer token is single-use: second correct-resource check must return False.""" from datetime import datetime, timedelta, timezone from backend.app.core.auth import verify_slicer_download_token from backend.app.models.auth_ephemeral import AuthEphemeralToken now = datetime.now(timezone.utc) token_val = secrets.token_urlsafe(24) db_session.add( AuthEphemeralToken( token=token_val, token_type="slicer_download", nonce="library:99", expires_at=now + timedelta(minutes=5), ) ) await db_session.commit() first = await verify_slicer_download_token(token_val, "library", 99) assert first is True second = await verify_slicer_download_token(token_val, "library", 99) assert second is False # =========================================================================== # M-NEW-3 / L-NEW-1: Schema length validation for change-password & forgot-password # =========================================================================== class TestSchemaLengthValidationR2: """Input length limits added in review round 2.""" @pytest.mark.asyncio @pytest.mark.integration async def test_change_password_current_too_long_rejected(self, async_client: AsyncClient): """current_password > 256 chars must be rejected with 422 (prevents pbkdf2 DoS).""" resp = await async_client.post( "/api/v1/users/me/change-password", json={"current_password": "x" * 257, "new_password": "ValidPass1!"}, ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_forgot_password_email_too_long_rejected(self, async_client: AsyncClient): """email > 254 chars must be rejected with 422.""" resp = await async_client.post( "/api/v1/auth/forgot-password", json={"email": "a" * 243 + "@example.com"}, ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_forgot_password_email_at_limit_passes_schema(self, async_client: AsyncClient): """Short email passes schema (may return 400/200 from business logic).""" resp = await async_client.post( "/api/v1/auth/forgot-password", json={"email": "user@example.com"}, ) assert resp.status_code != 422 # =========================================================================== # L-NEW-2: TOTPSetupRequest.code max_length # =========================================================================== class TestTOTPSetupCodeMaxLength: """TOTPSetupRequest.code must be bounded (L-NEW-2).""" @pytest.mark.asyncio @pytest.mark.integration async def test_setup_code_too_long_rejected(self, async_client: AsyncClient): """code > 8 chars must be rejected with 422.""" import pyotp as _pyotp token = await _setup_and_login(async_client, "totp_setup_maxlen", "totp_setup_maxlen1") # Enable TOTP so the setup-code guard path is active setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token)) secret = setup_resp.json()["secret"] await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": _pyotp.TOTP(secret).now()}, headers=_auth_header(token), ) resp = await async_client.post( "/api/v1/auth/2fa/totp/setup", json={"code": "1" * 9}, headers=_auth_header(token), ) assert resp.status_code == 422 # =========================================================================== # L-NEW-3: EmailOTPEnableConfirmRequest.code must be exactly 6 digits # =========================================================================== class TestEmailOTPConfirmCodeFormat: """EmailOTPEnableConfirmRequest.code must be 6 digits (L-NEW-3).""" @pytest.mark.asyncio @pytest.mark.integration async def test_non_digit_code_rejected(self, async_client: AsyncClient): """Alpha characters in the email OTP confirm code must be rejected with 422.""" token = await _setup_and_login(async_client, "emailotpfmt", "emailotpfmt1") resp = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": "x" * 32, "code": "ABCDEF"}, headers=_auth_header(token), ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_seven_digit_code_rejected(self, async_client: AsyncClient): """7-digit code must be rejected with 422 (min_length=max_length=6).""" token = await _setup_and_login(async_client, "emailotplen7", "emailotplen7x") resp = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": "x" * 32, "code": "1234567"}, headers=_auth_header(token), ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_valid_six_digit_code_passes_schema(self, async_client: AsyncClient): """6-digit numeric code passes schema (may return 400 on bad token — that's fine).""" token = await _setup_and_login(async_client, "emailotpfmt6", "emailotpfmt6x") resp = await async_client.post( "/api/v1/auth/2fa/email/enable/confirm", json={"setup_token": "x" * 32, "code": "123456"}, headers=_auth_header(token), ) assert resp.status_code != 422 # =========================================================================== # L-NEW-4: OIDCProviderCreate field max_length constraints # =========================================================================== class TestOIDCProviderFieldLengths: """OIDCProviderCreate fields must reject inputs exceeding max_length (L-NEW-4).""" @pytest.mark.asyncio @pytest.mark.integration async def test_name_too_long_rejected(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "oidcfldadmin", "oidcfldadmin1") resp = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "n" * 101, "issuer_url": "https://test.example.com", "client_id": "cid", "client_secret": "csec", "scopes": "openid", }, headers=_auth_header(token), ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_client_secret_too_long_rejected(self, async_client: AsyncClient): token = await _setup_and_login(async_client, "oidcseclen", "oidcseclen123") resp = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "ValidName", "issuer_url": "https://test.example.com", "client_id": "cid", "client_secret": "s" * 513, "scopes": "openid", }, headers=_auth_header(token), ) assert resp.status_code == 422 # --------------------------------------------------------------------------- # M-NEW-4 / M-NEW-5 / L-NEW-5: UserCreate & UserUpdate field length limits # --------------------------------------------------------------------------- class TestUserCreateUpdateFieldLengths: """UserCreate and UserUpdate must enforce max_length on username, password, email.""" @pytest.fixture async def admin_token(self, async_client: AsyncClient) -> str: return await _setup_and_login(async_client, "ucfldadmin", "ucfldadmin1!") @pytest.mark.asyncio @pytest.mark.integration async def test_create_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str): resp = await async_client.post( "/api/v1/users/", json={ "username": "u" * 151, "password": "ValidPass1!", "role": "user", }, headers=_auth_header(admin_token), ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_create_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str): resp = await async_client.post( "/api/v1/users/", json={ "username": "newuserX", "password": "A1!" + "x" * 254, "role": "user", }, headers=_auth_header(admin_token), ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_create_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str): resp = await async_client.post( "/api/v1/users/", json={ "username": "newuserY", "password": "ValidPass1!", "email": "a" * 246 + "@x.com", # total 253 chars -> fine; 248+@x.com=255 -> too long "role": "user", }, headers=_auth_header(admin_token), ) # 248 'a' + '@x.com' (6) = 254 chars — just at limit, should pass # Use 249 + '@x.com' = 255 chars to trigger the 422 assert resp.status_code in (201, 422) # boundary sanity check @pytest.mark.asyncio @pytest.mark.integration async def test_create_email_exceeds_limit_rejected(self, async_client: AsyncClient, admin_token: str): resp = await async_client.post( "/api/v1/users/", json={ "username": "newuserZ", "password": "ValidPass1!", "email": "a" * 249 + "@x.com", # 255 chars — exceeds RFC 5321 max of 254 "role": "user", }, headers=_auth_header(admin_token), ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_update_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str): # Create a user first create_resp = await async_client.post( "/api/v1/users/", json={"username": "updusr1", "password": "ValidPass1!", "role": "user"}, headers=_auth_header(admin_token), ) assert create_resp.status_code == 201 user_id = create_resp.json()["id"] resp = await async_client.patch( f"/api/v1/users/{user_id}", json={"username": "u" * 151}, headers=_auth_header(admin_token), ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_update_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str): create_resp = await async_client.post( "/api/v1/users/", json={"username": "updusr2", "password": "ValidPass1!", "role": "user"}, headers=_auth_header(admin_token), ) assert create_resp.status_code == 201 user_id = create_resp.json()["id"] resp = await async_client.patch( f"/api/v1/users/{user_id}", json={"password": "A1!" + "x" * 254}, headers=_auth_header(admin_token), ) assert resp.status_code == 422 @pytest.mark.asyncio @pytest.mark.integration async def test_update_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str): create_resp = await async_client.post( "/api/v1/users/", json={"username": "updusr3", "password": "ValidPass1!", "role": "user"}, headers=_auth_header(admin_token), ) assert create_resp.status_code == 201 user_id = create_resp.json()["id"] resp = await async_client.patch( f"/api/v1/users/{user_id}", json={"email": "a" * 249 + "@x.com"}, # 255 chars headers=_auth_header(admin_token), ) assert resp.status_code == 422 # --------------------------------------------------------------------------- # L-NEW-6: per-IP rate limiting on /forgot-password # --------------------------------------------------------------------------- _SMTP_DATA_FOR_IPLIMIT = { "smtp_host": "smtp.test.com", "smtp_port": 587, "smtp_username": "test@test.com", "smtp_password": "testpass", "smtp_security": "starttls", "smtp_auth_enabled": True, "smtp_from_email": "noreply@test.com", } class TestForgotPasswordPerIpRateLimit: """POST /forgot-password must enforce a per-IP cap (L-NEW-6). The test sends 11 requests from the simulated test-client IP using 11 different email addresses (so the per-email bucket is never exhausted). The 11th request must be rejected with 429. """ @pytest.fixture async def advanced_auth_token(self, async_client: AsyncClient) -> str: """Set up auth, SMTP, and enable advanced auth; return admin token.""" token = await _setup_and_login(async_client, "iprladmin", "iprladmin1!") headers = _auth_header(token) await async_client.post("/api/v1/auth/smtp", headers=headers, json=_SMTP_DATA_FOR_IPLIMIT) await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers) return token @pytest.mark.asyncio @pytest.mark.integration async def test_per_ip_limit_triggers_429(self, async_client: AsyncClient, advanced_auth_token: str): # Send 11 requests from the same test-client IP using unique email # addresses so the per-email bucket (limit=3) is never exhausted. responses = [] for i in range(11): resp = await async_client.post( "/api/v1/auth/forgot-password", json={"email": f"unique{i}@example.com"}, ) responses.append(resp.status_code) # First 10 must not be rate-limited by the IP bucket for code in responses[:10]: assert code != 429, f"Unexpected 429 before limit reached: {responses}" # The 11th must be rate-limited assert responses[10] == 429, f"Expected 429 on 11th request, got {responses[10]}" # --------------------------------------------------------------------------- # M-NEW-6: OIDC auto-link must be rejected if target user already has an # OIDC link to a different provider # --------------------------------------------------------------------------- class TestOIDCAutoLinkExistingLinkRejection: """OIDC callback must reject auto-linking when the email-matched user already has an OIDC link to a different provider (M-NEW-6).""" @pytest.mark.asyncio @pytest.mark.integration async def test_auto_link_rejected_when_user_already_linked( self, async_client: AsyncClient, db_session: AsyncSession ): """Auto-link via email-match is rejected when the target user is already linked to another OIDC provider.""" import base64 import hashlib from unittest.mock import AsyncMock, MagicMock, patch from backend.app.core.auth import get_password_hash from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink from backend.app.models.user import User # ── 1. Target user with a known email ──────────────────────────── target = User( username="oidcALTarget", email="alinktest@example.com", auth_source="oidc", password_hash=get_password_hash(secrets.token_urlsafe(16)), role="user", is_active=True, ) db_session.add(target) await db_session.flush() # ── 2. Provider B — legitimate, already linked to target ────────── prov_b = OIDCProvider( name="ProvB_m6test", issuer_url="https://providerb-m6.example.com", client_id="client_b", _client_secret_enc="secret_b", scopes="openid email profile", is_enabled=True, auto_link_existing_accounts=False, auto_create_users=False, ) db_session.add(prov_b) await db_session.flush() db_session.add( UserOIDCLink( user_id=target.id, provider_id=prov_b.id, provider_user_id="legitimate_sub", provider_email="alinktest@example.com", ) ) # ── 3. Provider A — attacker-controlled, auto_link=True ─────────── prov_a = OIDCProvider( name="ProvA_m6test", issuer_url="https://providera-m6.example.com", client_id="client_a", _client_secret_enc="secret_a", scopes="openid email profile", is_enabled=True, auto_link_existing_accounts=True, auto_create_users=False, ) db_session.add(prov_a) await db_session.flush() # ── 4. OIDC state for Provider A ────────────────────────────────── state = secrets.token_urlsafe(32) nonce = secrets.token_urlsafe(32) code_verifier = secrets.token_urlsafe(48) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=prov_a.id, nonce=nonce, code_verifier=code_verifier, expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() # ── 5. Mock HTTP + JWT so the callback can reach the auto-link check ─ fake_discovery = { "issuer": "https://providera-m6.example.com", "token_endpoint": "https://providera-m6.example.com/token", "jwks_uri": "https://providera-m6.example.com/jwks", } fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"} fake_claims = { "sub": "attacker_sub_unique", "email": "alinktest@example.com", "email_verified": True, "nonce": nonce, "iss": "https://providera-m6.example.com", "aud": "client_a", "exp": 9_999_999_999, } disc_resp = AsyncMock() disc_resp.raise_for_status = MagicMock() disc_resp.json = MagicMock(return_value=fake_discovery) token_resp = AsyncMock() token_resp.ok = True token_resp.json = MagicMock(return_value=fake_token) jwks_resp = AsyncMock() jwks_resp.raise_for_status = MagicMock() jwks_resp.json = MagicMock(return_value={}) mock_http = AsyncMock() mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp]) mock_http.post = AsyncMock(return_value=token_resp) mock_signing_key = MagicMock() mock_signing_key.key = "fake_key" with ( patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls, patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims), patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls, ): mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http) mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False) mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key resp = await async_client.get( f"/api/v1/auth/oidc/callback?code=fake_code&state={state}", follow_redirects=False, ) # M-NEW-6: must redirect with no_linked_account — NOT create a second link assert resp.status_code == 302 location = resp.headers.get("location", "") assert "no_linked_account" in location, f"Expected no_linked_account in redirect, got: {location}" # Verify no second OIDC link was created for Provider A from sqlalchemy import select as sa_select from backend.app.models.oidc_provider import UserOIDCLink as _UOL async with db_session as s: links_result = await s.execute( sa_select(_UOL).where(_UOL.user_id == target.id, _UOL.provider_id == prov_a.id) ) assert links_result.scalar_one_or_none() is None, "No link to Provider A must exist" # =========================================================================== # Test Gap 1: OIDC state token is single-use — replay must be rejected # =========================================================================== class TestOIDCStateReplay: """OIDC state token must be consumed on first use; a second callback with the same state must redirect to ``?oidc_error=invalid_state``.""" @pytest.mark.asyncio @pytest.mark.integration async def test_state_replay_rejected(self, async_client: AsyncClient, db_session: AsyncSession): """Replaying a consumed OIDC state token must return invalid_state.""" from backend.app.models.oidc_provider import OIDCProvider # ── 1. Seed a minimal provider ──────────────────────────────────── provider = OIDCProvider( name="StateReplayIdP", issuer_url="https://statereplay-idp.example.com", client_id="client_replay", _client_secret_enc="secret_replay", scopes="openid", is_enabled=True, auto_link_existing_accounts=False, auto_create_users=False, ) db_session.add(provider) await db_session.flush() # ── 2. Seed an OIDC state token ─────────────────────────────────── state = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=provider.id, nonce=secrets.token_urlsafe(32), code_verifier=secrets.token_urlsafe(48), expires_at=datetime.now(timezone.utc) + timedelta(minutes=10), ) ) await db_session.commit() # ── 3. First callback — discovery will fail (no real IdP), but the # state token is atomically consumed (DELETE…RETURNING + commit) # before the HTTP call is attempted. first = await async_client.get( f"/api/v1/auth/oidc/callback?code=any_code&state={state}", follow_redirects=False, ) assert first.status_code == 302 # The first call may fail for any reason except invalid_state assert "invalid_state" not in first.headers.get("location", ""), ( f"First call should NOT get invalid_state: {first.headers.get('location')}" ) # ── 4. Second callback with the same state → must be invalid_state ─ second = await async_client.get( f"/api/v1/auth/oidc/callback?code=any_code&state={state}", follow_redirects=False, ) assert second.status_code == 302 assert "invalid_state" in second.headers.get("location", ""), ( f"Replayed state must redirect to invalid_state, got: {second.headers.get('location')}" ) # =========================================================================== # Test Gap 2: OIDC iss claim mismatch must redirect to token_validation_failed # =========================================================================== class TestOIDCIssMismatch: """JWT whose iss claim does not match the discovery issuer must be rejected.""" @pytest.mark.asyncio @pytest.mark.integration async def test_iss_mismatch_redirects_token_validation_failed( self, async_client: AsyncClient, db_session: AsyncSession ): import time from unittest.mock import patch import jwt as pyjwt private_pem, jwks_data = _make_test_rsa_key() correct_issuer = "https://correct-iss.example.com" wrong_issuer = "https://wrong-iss.example.com" client_id = "iss-mismatch-client" nonce = secrets.token_urlsafe(16) now = int(time.time()) # Sign the token with the WRONG issuer (iss != discovery_issuer) id_token = pyjwt.encode( { "sub": "sub-iss-test", "iss": wrong_issuer, "aud": client_id, "nonce": nonce, "email": "iss@example.com", "email_verified": True, "iat": now, "exp": now + 300, }, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"}, ) admin_token = await _setup_and_login(async_client, "issadmin1", "issadmin1!") cr = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "IssTest-IdP", "issuer_url": correct_issuer, "client_id": client_id, "client_secret": "s", "scopes": "openid", "is_enabled": True, "auto_create_users": True, }, headers=_auth_header(admin_token), ) assert cr.status_code in (200, 201), cr.text provider_id = cr.json()["id"] state = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=provider_id, nonce=nonce, code_verifier=secrets.token_urlsafe(48), expires_at=datetime.now(timezone.utc) + timedelta(minutes=5), ) ) await db_session.commit() # Discovery returns the CORRECT issuer; JWT carries the WRONG one. discovery_doc = { "issuer": correct_issuer, "token_endpoint": f"{correct_issuer}/token", "jwks_uri": f"{correct_issuer}/.well-known/jwks.json", } token_response = {"access_token": "a", "id_token": id_token} class _MockResp: def __init__(self, data): self._data = data self.status_code = 200 self.is_success = True self.text = "" def json(self): return self._data def raise_for_status(self): pass class _MockHttpxClient: def __init__(self, *a, **kw): pass async def __aenter__(self): return self async def __aexit__(self, *a): pass async def get(self, url, **kw): return _MockResp(jwks_data if "jwks" in url else discovery_doc) async def post(self, url, **kw): return _MockResp(token_response) with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient): resp = await async_client.get( f"/api/v1/auth/oidc/callback?code=c&state={state}", follow_redirects=False, ) assert resp.status_code == 302 location = resp.headers.get("location", "") assert "token_validation_failed" in location, f"Expected token_validation_failed, got: {location}" # =========================================================================== # Test Gap 3: /forgot-password/confirm token is single-use # =========================================================================== class TestForgotPasswordTokenSingleUse: """POST /forgot-password/confirm must reject a token after its first use.""" @pytest.mark.asyncio @pytest.mark.integration async def test_token_reuse_rejected(self, async_client: AsyncClient, db_session: AsyncSession): from backend.app.core.auth import get_password_hash from backend.app.models.user import User as _User user = _User( username="fpcuser1", email="fpc@example.com", password_hash=get_password_hash("OldPass1!"), role="user", is_active=True, ) db_session.add(user) await db_session.flush() reset_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=reset_token, token_type="password_reset", username="fpcuser1", expires_at=datetime.now(timezone.utc) + timedelta(hours=1), ) ) await db_session.commit() # First use → success resp1 = await async_client.post( "/api/v1/auth/forgot-password/confirm", json={"token": reset_token, "new_password": "NewPass1!"}, ) assert resp1.status_code == 200, resp1.text # Second use → token already consumed, must fail resp2 = await async_client.post( "/api/v1/auth/forgot-password/confirm", json={"token": reset_token, "new_password": "AnotherNew1!"}, ) assert resp2.status_code == 400 # =========================================================================== # C1 regression: setup_totp must reject a replayed TOTP code # =========================================================================== class TestSetupTOTPReplayRejected: """setup_totp must reject a TOTP code that was already accepted in its window.""" @pytest.mark.asyncio @pytest.mark.integration async def test_replayed_setup_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession): from sqlalchemy import select as sa_select from backend.app.models.user_totp import UserTOTP token = await _setup_and_login(async_client, "setupreplay1", "setupreplay1!") # Step 1: Initial TOTP setup (no active TOTP yet → no code required) setup_resp = await async_client.post( "/api/v1/auth/2fa/totp/setup", headers=_auth_header(token), ) assert setup_resp.status_code == 200 secret = setup_resp.json()["secret"] # Step 2: Enable TOTP with a valid code totp_obj = pyotp.TOTP(secret) enable_resp = await async_client.post( "/api/v1/auth/2fa/totp/enable", json={"code": totp_obj.now()}, headers=_auth_header(token), ) assert enable_resp.status_code == 200 # TOTP is now active (is_enabled=True) # Step 3: Determine current valid code and its counter me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token)) user_id = me_resp.json()["id"] totp_result = await db_session.execute(sa_select(UserTOTP).where(UserTOTP.user_id == user_id)) totp_record = totp_result.scalar_one() secret_now = totp_record.secret # decrypted via property totp_now = pyotp.TOTP(secret_now) valid_code = totp_now.now() accepted_counter = totp_now.timecode(datetime.now(timezone.utc)) # Step 4: Pre-set last_totp_counter so this code looks already used totp_record.last_totp_counter = accepted_counter await db_session.commit() # Step 5: Attempt setup_totp with the "already used" code → must be rejected replay_resp = await async_client.post( "/api/v1/auth/2fa/totp/setup", json={"code": valid_code}, headers=_auth_header(token), ) assert replay_resp.status_code == 400 assert "already used" in replay_resp.json()["detail"] # =========================================================================== # Nit8: OIDC aud mismatch and nonce mismatch tests # =========================================================================== class TestOIDCAudAndNonceMismatch: """Nit8: aud != client_id and nonce != stored value must each fail the callback.""" def _make_oidc_provider_setup(self): """Return a helper for building OIDC test fixtures inline.""" private_pem, jwks_data = _make_test_rsa_key() return private_pem, jwks_data @pytest.mark.asyncio @pytest.mark.integration async def test_aud_mismatch_redirects_token_validation_failed( self, async_client: AsyncClient, db_session: AsyncSession ): """ID token with aud != client_id must be rejected (PyJWT InvalidAudienceError).""" import time from unittest.mock import patch import jwt as pyjwt private_pem, jwks_data = _make_test_rsa_key() issuer = "https://aud-mismatch.example.com" client_id = "aud-test-client" wrong_aud = "some-other-client" nonce = secrets.token_urlsafe(16) now = int(time.time()) id_token = pyjwt.encode( { "sub": "sub-aud-test", "iss": issuer, "aud": wrong_aud, # <-- wrong audience "nonce": nonce, "email": "aud@example.com", "email_verified": True, "iat": now, "exp": now + 300, }, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"}, ) admin_token = await _setup_and_login(async_client, "audmismatch_admin", "AudMismatch_admin1") cr = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "AudMismatch-IdP", "issuer_url": issuer, "client_id": client_id, "client_secret": "s", "scopes": "openid", "is_enabled": True, "auto_create_users": True, }, headers=_auth_header(admin_token), ) assert cr.status_code in (200, 201), cr.text provider_id = cr.json()["id"] state = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=provider_id, nonce=nonce, code_verifier=secrets.token_urlsafe(48), expires_at=datetime.now(timezone.utc) + timedelta(minutes=5), ) ) await db_session.commit() discovery_doc = { "issuer": issuer, "token_endpoint": f"{issuer}/token", "jwks_uri": f"{issuer}/.well-known/jwks.json", } class _MockResp: def __init__(self, data): self._data = data self.status_code = 200 self.is_success = True self.text = "" def json(self): return self._data def raise_for_status(self): pass class _MockHttpxClient: def __init__(self, *a, **kw): pass async def __aenter__(self): return self async def __aexit__(self, *a): pass async def get(self, url, **kw): return _MockResp(jwks_data if "jwks" in url else discovery_doc) async def post(self, url, **kw): return _MockResp({"access_token": "a", "id_token": id_token}) with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient): resp = await async_client.get( f"/api/v1/auth/oidc/callback?code=c&state={state}", follow_redirects=False, ) assert resp.status_code == 302 location = resp.headers.get("location", "") assert "token_validation_failed" in location, ( f"Expected token_validation_failed redirect for aud mismatch, got: {location}" ) @pytest.mark.asyncio @pytest.mark.integration async def test_nonce_mismatch_redirects_token_validation_failed( self, async_client: AsyncClient, db_session: AsyncSession ): """ID token with nonce != stored state nonce must be rejected.""" import time from unittest.mock import patch import jwt as pyjwt private_pem, jwks_data = _make_test_rsa_key() issuer = "https://nonce-mismatch.example.com" client_id = "nonce-test-client" stored_nonce = secrets.token_urlsafe(16) wrong_nonce = secrets.token_urlsafe(16) # different from stored_nonce now = int(time.time()) id_token = pyjwt.encode( { "sub": "sub-nonce-test", "iss": issuer, "aud": client_id, "nonce": wrong_nonce, # <-- does not match stored_nonce "email": "nonce@example.com", "email_verified": True, "iat": now, "exp": now + 300, }, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"}, ) admin_token = await _setup_and_login(async_client, "noncemismatch_admin", "NonceMismatch_admin1") cr = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "NonceMismatch-IdP", "issuer_url": issuer, "client_id": client_id, "client_secret": "s", "scopes": "openid", "is_enabled": True, "auto_create_users": True, }, headers=_auth_header(admin_token), ) assert cr.status_code in (200, 201), cr.text provider_id = cr.json()["id"] state = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=provider_id, nonce=stored_nonce, # state has correct nonce; JWT carries wrong_nonce code_verifier=secrets.token_urlsafe(48), expires_at=datetime.now(timezone.utc) + timedelta(minutes=5), ) ) await db_session.commit() discovery_doc = { "issuer": issuer, "token_endpoint": f"{issuer}/token", "jwks_uri": f"{issuer}/.well-known/jwks.json", } class _MockResp: def __init__(self, data): self._data = data self.status_code = 200 self.is_success = True self.text = "" def json(self): return self._data def raise_for_status(self): pass class _MockHttpxClient: def __init__(self, *a, **kw): pass async def __aenter__(self): return self async def __aexit__(self, *a): pass async def get(self, url, **kw): return _MockResp(jwks_data if "jwks" in url else discovery_doc) async def post(self, url, **kw): return _MockResp({"access_token": "a", "id_token": id_token}) with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient): resp = await async_client.get( f"/api/v1/auth/oidc/callback?code=c&state={state}", follow_redirects=False, ) assert resp.status_code == 302 location = resp.headers.get("location", "") # The callback redirects to ?oidc_error=nonce_mismatch when nonces differ. assert "nonce_mismatch" in location, f"Expected nonce_mismatch redirect for nonce mismatch, got: {location}" # =========================================================================== # Expired OIDC token rejection — state and exchange tokens # =========================================================================== class TestOIDCExpiredTokenRejection: """Expired OIDC state and exchange tokens must be rejected atomically. The DELETE … WHERE expires_at > now must ensure that an already-expired token is never consumed (committed) before the expiry is checked, so the token row stays in the DB and is not silently discarded. """ @pytest.mark.asyncio @pytest.mark.integration async def test_expired_state_token_rejected_as_invalid_state( self, async_client: AsyncClient, db_session: AsyncSession ): """An expired OIDC state token must redirect to invalid_state without being consumed — it must still exist in the DB after the rejected call.""" from backend.app.models.oidc_provider import OIDCProvider provider = OIDCProvider( name="ExpiredStateIdP", issuer_url="https://expired-state.example.com", client_id="client_expired_state", _client_secret_enc="secret_exp_state", scopes="openid", is_enabled=True, auto_link_existing_accounts=False, auto_create_users=False, ) db_session.add(provider) await db_session.flush() state = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=provider.id, nonce=secrets.token_urlsafe(16), code_verifier=secrets.token_urlsafe(48), # already expired expires_at=datetime.now(timezone.utc) - timedelta(minutes=5), ) ) await db_session.commit() resp = await async_client.get( f"/api/v1/auth/oidc/callback?code=any_code&state={state}", follow_redirects=False, ) assert resp.status_code == 302 location = resp.headers.get("location", "") assert "invalid_state" in location, f"Expected invalid_state redirect for expired state, got: {location}" @pytest.mark.asyncio @pytest.mark.integration async def test_expired_exchange_token_rejected(self, async_client: AsyncClient, db_session: AsyncSession): """An expired OIDC exchange token must return 401 without being consumed.""" from sqlalchemy import select as sa_select expired_token = secrets.token_urlsafe(32) db_session.add( AuthEphemeralToken( token=expired_token, token_type="oidc_exchange", username="some_user", # already expired expires_at=datetime.now(timezone.utc) - timedelta(minutes=5), ) ) await db_session.commit() resp = await async_client.post( "/api/v1/auth/oidc/exchange", json={"oidc_token": expired_token}, ) assert resp.status_code == 401 assert "expired" in resp.json().get("detail", "").lower() or "invalid" in resp.json().get("detail", "").lower() # Token must NOT have been consumed — it should still be in the DB # (the atomic DELETE WHERE expires_at > now left it untouched) result = await db_session.execute( sa_select(AuthEphemeralToken).where(AuthEphemeralToken.token == expired_token) ) remaining = result.scalar_one_or_none() assert remaining is not None, "Expired exchange token must not be consumed by a rejected request" # =========================================================================== # Trailing slash in issuer_url — discovery URL must not contain double slash # =========================================================================== class TestOIDCIssuerUrlTrailingSlash: """Providers like Authentik use issuer URLs with a trailing slash. BamBuddy must strip the slash before appending /.well-known/openid-configuration to avoid a double-slash that results in a 404. """ @pytest.mark.asyncio @pytest.mark.integration async def test_trailing_slash_issuer_url_fetches_correct_discovery_url( self, async_client: AsyncClient ): from unittest.mock import AsyncMock, MagicMock, patch issuer_with_slash = "https://authentik.example.com/application/o/bambuddy/" admin_token = await _setup_and_login(async_client, "oidcslashadm", "oidcslashadm1") create_resp = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "Authentik-Slash", "issuer_url": issuer_with_slash, "client_id": "bambuddy", "client_secret": "secret", "scopes": "openid email profile", "is_enabled": True, "auto_create_users": False, }, headers=_auth_header(admin_token), ) assert create_resp.status_code == 201 provider_id = create_resp.json()["id"] fake_discovery = { "issuer": issuer_with_slash, "authorization_endpoint": "https://authentik.example.com/application/o/bambuddy/authorize", } disc_resp = AsyncMock() disc_resp.raise_for_status = MagicMock() disc_resp.json = MagicMock(return_value=fake_discovery) mock_http = AsyncMock() mock_http.get = AsyncMock(return_value=disc_resp) with patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_cls: mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http) mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) resp = await async_client.get(f"/api/v1/auth/oidc/authorize/{provider_id}") assert resp.status_code == 200 called_url = mock_http.get.call_args_list[0][0][0] assert "//" not in called_url.replace("https://", ""), ( f"Discovery URL must not contain double slash: {called_url}" ) assert called_url.endswith("/.well-known/openid-configuration"), ( f"Expected discovery URL to end with /.well-known/openid-configuration, got: {called_url}" ) @pytest.mark.asyncio @pytest.mark.integration async def test_iss_claim_trailing_slash_accepted( self, async_client: AsyncClient, db_session: AsyncSession ): """Provider configured without trailing slash, Authentik JWT iss has trailing slash. Both sides must be normalised before comparison so the login succeeds. """ import time from unittest.mock import patch import jwt as pyjwt private_pem, jwks_data = _make_test_rsa_key() issuer_no_slash = "https://authentik.example.com/application/o/bambuddy" issuer_with_slash = issuer_no_slash + "/" client_id = "bambuddy-client" nonce = secrets.token_urlsafe(16) now = int(time.time()) id_token = pyjwt.encode( { "sub": "authentik-sub-123", "iss": issuer_with_slash, "aud": client_id, "nonce": nonce, "email": "authentik-user@example.com", "email_verified": True, "iat": now, "exp": now + 300, }, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"}, ) admin_token = await _setup_and_login(async_client, "authentikadm", "authentikadm1") create_resp = await async_client.post( "/api/v1/auth/oidc/providers", json={ "name": "Authentik-ISS", "issuer_url": issuer_no_slash, "client_id": client_id, "client_secret": "secret", "scopes": "openid email profile", "is_enabled": True, "auto_create_users": True, }, headers=_auth_header(admin_token), ) assert create_resp.status_code == 201 provider_id = create_resp.json()["id"] state = secrets.token_urlsafe(32) code_verifier = secrets.token_urlsafe(48) db_session.add( AuthEphemeralToken( token=state, token_type="oidc_state", provider_id=provider_id, nonce=nonce, code_verifier=code_verifier, expires_at=datetime.now(timezone.utc) + timedelta(minutes=5), ) ) await db_session.commit() discovery_doc = { "issuer": issuer_with_slash, "authorization_endpoint": f"{issuer_no_slash}/authorize", "token_endpoint": f"{issuer_no_slash}/token", "jwks_uri": f"{issuer_no_slash}/.well-known/jwks.json", } token_response = {"access_token": "mock", "token_type": "Bearer", "id_token": id_token} class _MockResp: def __init__(self, data): self._data = data self.is_success = True self.status_code = 200 self.text = str(data) def json(self): return self._data def raise_for_status(self): pass class _MockHttpxClient: def __init__(self, *a, **kw): pass async def __aenter__(self): return self async def __aexit__(self, *a): pass async def get(self, url, **kw): return _MockResp(jwks_data if "jwks" in url else discovery_doc) async def post(self, url, **kw): return _MockResp(token_response) with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient): resp = await async_client.get( f"/api/v1/auth/oidc/callback?code=auth-code&state={state}", follow_redirects=False, ) location = resp.headers.get("location", "") assert resp.status_code == 302, f"Expected redirect, got {resp.status_code}" assert "token_validation_failed" not in location, ( "Trailing slash mismatch in iss claim must not cause token_validation_failed" ) assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"