| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796 |
- """Security tests for the 8 coverage gaps identified in the maintainer review.
- Gap 1: encryption.py has zero tests
- Gap 2: JWT revocation (revoke_jti, is_jti_revoked, _is_token_fresh) untested
- Gap 3: OIDC exchange token replay untested
- Gap 4: OIDC email_verified claim handling untested
- Gap 5: Email OTP max-attempts invalidation untested
- Gap 6: OIDC callback error redirects (SSRF protection) undertested
- Gap 7: Login rate limiting untested
- Gap 8: challenge_id cookie binding untested
- """
- from __future__ import annotations
- import base64
- import secrets
- import time
- from datetime import datetime, timedelta, timezone
- from unittest.mock import AsyncMock, MagicMock, patch
- import jwt as pyjwt
- import pytest
- from cryptography.hazmat.primitives import serialization
- from cryptography.hazmat.primitives.asymmetric import rsa
- from httpx import AsyncClient
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- from backend.app.models.user import User
- AUTH_SETUP_URL = "/api/v1/auth/setup"
- LOGIN_URL = "/api/v1/auth/login"
- LOGOUT_URL = "/api/v1/auth/logout"
- ME_URL = "/api/v1/auth/me"
- def _auth_header(token: str) -> dict[str, str]:
- return {"Authorization": f"Bearer {token}"}
- def _norm_pw(password: str) -> str:
- """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
- if not any(c.isupper() for c in password):
- password = password[0].upper() + password[1:]
- if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
- password = password + "!"
- return password
- async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
- password = _norm_pw(password)
- await client.post(
- AUTH_SETUP_URL,
- json={"auth_enabled": True, "admin_username": username, "admin_password": password},
- )
- resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
- assert resp.status_code == 200
- return resp.json()["access_token"]
- def _make_test_rsa_key():
- def _b64url(n: int, length: int) -> str:
- return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
- private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
- private_pem = private_key.private_bytes(
- serialization.Encoding.PEM,
- serialization.PrivateFormat.TraditionalOpenSSL,
- serialization.NoEncryption(),
- )
- pub_numbers = private_key.public_key().public_numbers()
- jwks = {
- "keys": [
- {
- "kty": "RSA",
- "use": "sig",
- "alg": "RS256",
- "kid": "test-kid-1",
- "n": _b64url(pub_numbers.n, 256),
- "e": _b64url(pub_numbers.e, 3),
- }
- ]
- }
- return private_pem, jwks
- # ===========================================================================
- # Gap 1: encryption.py unit tests
- # ===========================================================================
- class TestEncryption:
- """encrypt/decrypt round-trips, plaintext passthrough, RuntimeError on missing key."""
- def test_encrypt_decrypt_roundtrip_with_key(self):
- from cryptography.fernet import Fernet
- test_key = Fernet.generate_key().decode()
- import backend.app.core.encryption as enc_mod
- original = enc_mod._fernet_instance
- original_warn = enc_mod._warn_shown
- try:
- enc_mod._fernet_instance = None
- enc_mod._warn_shown = False
- with patch.dict("os.environ", {"MFA_ENCRYPTION_KEY": test_key}):
- ciphertext = enc_mod.mfa_encrypt("my-totp-secret")
- assert ciphertext.startswith("fernet:")
- assert enc_mod.mfa_decrypt(ciphertext) == "my-totp-secret"
- finally:
- enc_mod._fernet_instance = original
- enc_mod._warn_shown = original_warn
- def test_plaintext_passthrough_without_key(self):
- import backend.app.core.encryption as enc_mod
- original = enc_mod._fernet_instance
- original_warn = enc_mod._warn_shown
- try:
- enc_mod._fernet_instance = None
- enc_mod._warn_shown = False
- with patch.dict("os.environ", {}, clear=True):
- env = {k: v for k, v in __import__("os").environ.items() if k != "MFA_ENCRYPTION_KEY"}
- with patch.dict("os.environ", env, clear=True):
- result = enc_mod.mfa_encrypt("plaintext-secret")
- assert result == "plaintext-secret"
- assert enc_mod.mfa_decrypt("plaintext-secret") == "plaintext-secret"
- finally:
- enc_mod._fernet_instance = original
- enc_mod._warn_shown = original_warn
- def test_decrypt_raises_runtime_error_without_key_for_encrypted_value(self):
- import backend.app.core.encryption as enc_mod
- original = enc_mod._fernet_instance
- original_warn = enc_mod._warn_shown
- try:
- enc_mod._fernet_instance = None
- enc_mod._warn_shown = False
- # A value with the fernet: prefix but no key configured
- env = {k: v for k, v in __import__("os").environ.items() if k != "MFA_ENCRYPTION_KEY"}
- with (
- patch.dict("os.environ", env, clear=True),
- pytest.raises(RuntimeError, match="MFA_ENCRYPTION_KEY must be set"),
- ):
- enc_mod.mfa_decrypt("fernet:gAAAAA-fake-ciphertext")
- finally:
- enc_mod._fernet_instance = original
- enc_mod._warn_shown = original_warn
- # ===========================================================================
- # Gap 2: JWT revocation — revoke_jti, is_jti_revoked, _is_token_fresh, /me
- # ===========================================================================
- class TestJWTRevocation:
- """JWT revocation and token freshness checks."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_revoke_jti_and_is_jti_revoked(self, async_client: AsyncClient, db_session: AsyncSession):
- """revoke_jti stores the JTI; is_jti_revoked returns True afterwards."""
- from backend.app.core.auth import is_jti_revoked, revoke_jti
- test_jti = secrets.token_urlsafe(16)
- expires = datetime.now(timezone.utc) + timedelta(hours=1)
- assert not await is_jti_revoked(test_jti)
- await revoke_jti(test_jti, expires, username="testuser")
- assert await is_jti_revoked(test_jti)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_revoke_jti_idempotent(self, async_client: AsyncClient):
- """Double-revocation of the same JTI should not raise."""
- from backend.app.core.auth import is_jti_revoked, revoke_jti
- jti = secrets.token_urlsafe(16)
- expires = datetime.now(timezone.utc) + timedelta(hours=1)
- await revoke_jti(jti, expires)
- await revoke_jti(jti, expires) # must not raise
- assert await is_jti_revoked(jti)
- def test_is_token_fresh_rejects_none_iat(self):
- """_is_token_fresh returns False when iat is None (I1 hard cutoff)."""
- from backend.app.core.auth import _is_token_fresh
- user = MagicMock()
- user.password_changed_at = None
- assert _is_token_fresh(None, user) is False
- def test_is_token_fresh_rejects_token_before_password_change(self):
- """_is_token_fresh returns False when iat predates password_changed_at."""
- from backend.app.core.auth import _is_token_fresh
- now = datetime.now(timezone.utc)
- user = MagicMock()
- user.password_changed_at = now
- old_iat = (now - timedelta(hours=1)).timestamp()
- assert _is_token_fresh(old_iat, user) is False
- def test_is_token_fresh_accepts_token_after_password_change(self):
- """_is_token_fresh returns True when iat is after password_changed_at."""
- from backend.app.core.auth import _is_token_fresh
- now = datetime.now(timezone.utc)
- user = MagicMock()
- user.password_changed_at = now - timedelta(hours=1)
- recent_iat = now.timestamp()
- assert _is_token_fresh(recent_iat, user) is True
- def test_is_token_fresh_returns_true_when_no_password_change(self):
- """_is_token_fresh returns True when password_changed_at is None (I2 migration not yet run)."""
- from backend.app.core.auth import _is_token_fresh
- user = MagicMock()
- user.password_changed_at = None
- assert _is_token_fresh(time.time(), user) is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_me_endpoint_rejects_token_after_logout(self, async_client: AsyncClient):
- """After logout, the bearer token must be rejected by /me (B1 + revocation)."""
- token = await _setup_and_login(async_client, "sec_logout_me", "sec_logout_me1")
- # Token works before logout
- me_resp = await async_client.get(ME_URL, headers=_auth_header(token))
- assert me_resp.status_code == 200
- # Logout
- logout_resp = await async_client.post(LOGOUT_URL, headers=_auth_header(token))
- assert logout_resp.status_code == 200
- # Token must now be rejected
- me_after = await async_client.get(ME_URL, headers=_auth_header(token))
- assert me_after.status_code == 401
- # ===========================================================================
- # Gap 3: OIDC exchange token replay
- # ===========================================================================
- class TestOIDCExchangeReplay:
- """A single-use OIDC exchange token cannot be redeemed twice."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_exchange_token_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
- """The second call to /oidc/exchange with the same token returns 401."""
- exchange_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=exchange_token,
- token_type="oidc_exchange",
- username="oidc_replay_user",
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Seed the user so the exchange can resolve it
- from backend.app.core.auth import get_password_hash
- from backend.app.core.database import async_session, seed_default_groups
- async with async_session() as db:
- result = await db.execute(__import__("sqlalchemy").select(User).where(User.username == "oidc_replay_user"))
- if result.scalar_one_or_none() is None:
- db.add(
- User(
- username="oidc_replay_user",
- password_hash=get_password_hash("pw"),
- is_active=True,
- )
- )
- await db.commit()
- first = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
- assert first.status_code == 200
- second = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
- assert second.status_code == 401
- # ===========================================================================
- # Gap 4: OIDC email_verified claim handling
- # ===========================================================================
- class TestOIDCEmailVerified:
- """email_verified: False/absent must not link OIDC identity to an existing email."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_unverified_email_does_not_link_to_existing_user(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """If email_verified is False, the OIDC callback must not auto-link by email."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://idp.evtest.example.com"
- client_id = "ev-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "ev-sub-new",
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "existing@example.com",
- "email_verified": False, # <-- must be ignored
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "ev_admin", "ev_admin1")
- # Create existing user with the same email (use strong password for validator)
- create_user_resp = await async_client.post(
- "/api/v1/users",
- json={"username": "existing_email_user", "password": "Str0ng!Pass", "email": "existing@example.com"},
- headers=_auth_header(admin_token),
- )
- assert create_user_resp.status_code in (200, 201), create_user_resp.json()
- # Create OIDC provider
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "EV-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "secret",
- "scopes": "openid email",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClientEV:
- def __init__(self, *args, **kwargs):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *_):
- pass
- async def get(self, url, **kwargs):
- if "jwks" in url:
- return _MockResp(jwks_data)
- return _MockResp(discovery_doc)
- async def post(self, url, **kwargs):
- return _MockResp({"access_token": "mock", "token_type": "Bearer", "id_token": id_token})
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientEV):
- await async_client.get(
- f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
- follow_redirects=False,
- )
- # Callback must NOT link to the existing_email_user — a new user is created
- # instead (because the email claim was ignored due to email_verified=False).
- # Either a new user is provisioned (redirect with oidc_token) or the callback
- # fails. In either case, the existing user must not have an OIDC link.
- from sqlalchemy import select as sa_select
- from backend.app.models.oidc_provider import UserOIDCLink
- link_result = await db_session.execute(
- sa_select(UserOIDCLink)
- .join(User, UserOIDCLink.user_id == User.id)
- .where(User.email == "existing@example.com")
- )
- link = link_result.scalar_one_or_none()
- assert link is None, "Existing user must not be auto-linked when email_verified is False"
- # ===========================================================================
- # Gap 5: Email OTP max-attempts invalidation
- # ===========================================================================
- class TestEmailOTPMaxAttempts:
- """After MAX_ATTEMPTS wrong codes, the OTP is permanently invalidated."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_otp_invalidated_after_max_attempts(self, async_client: AsyncClient, db_session: AsyncSession):
- from passlib.context import CryptContext
- from sqlalchemy import select as sa_select
- from backend.app.models.user_otp_code import UserOTPCode
- _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
- admin_token = await _setup_and_login(async_client, "otp_max_admin", "otp_max_admin1")
- # Enable email OTP for admin user
- result = await db_session.execute(sa_select(User).where(User.username == "otp_max_admin"))
- user = result.scalar_one()
- user.email = "otpmax@example.com"
- await db_session.commit()
- setup_code = "123456"
- from backend.app.models.auth_ephemeral import AuthEphemeralToken as AET
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AET(
- token=setup_token,
- token_type="email_otp_setup",
- username="otp_max_admin",
- nonce=_pwd_ctx.hash(setup_code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": setup_code},
- headers=_auth_header(admin_token),
- )
- # Login to get pre_auth_token
- login_resp = await async_client.post(
- LOGIN_URL, json={"username": "otp_max_admin", "password": "Otp_max_admin1"}
- )
- pre_auth_token = login_resp.json()["pre_auth_token"]
- # Insert an OTP record directly (bypassing SMTP)
- real_code = "654321"
- otp = UserOTPCode(
- user_id=user.id,
- code_hash=_pwd_ctx.hash(real_code),
- attempts=0,
- used=False,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- db_session.add(otp)
- await db_session.commit()
- # Submit MAX_ATTEMPTS wrong codes
- from backend.app.api.routes.mfa import MAX_2FA_ATTEMPTS
- for _ in range(MAX_2FA_ATTEMPTS):
- r = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "code": "000000", "method": "email"},
- )
- # Each attempt must fail with 401
- assert r.status_code == 401
- # After max attempts, the correct code is also rejected (either OTP
- # invalidated → 401, or rate limit hit → 429). Either means locked out.
- final = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "code": real_code, "method": "email"},
- )
- assert final.status_code in (401, 429), f"Expected lockout, got {final.status_code}: {final.json()}"
- # ===========================================================================
- # Gap 6: OIDC callback SSRF protection — invalid authorization_endpoint scheme
- # ===========================================================================
- class TestOIDCSSRFProtection:
- """authorization_endpoint with non-http(s) scheme must be rejected."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_authorization_endpoint_scheme_rejected(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- issuer = "https://idp.ssrf.example.com"
- client_id = "ssrf-client"
- admin_token = await _setup_and_login(async_client, "ssrf_admin", "ssrf_admin1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SSRF-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "secret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- # Discovery doc returns a javascript: authorization_endpoint
- malicious_discovery = {
- "issuer": issuer,
- "authorization_endpoint": "javascript:alert(1)", # <-- malicious
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClientSSRF:
- def __init__(self, *args, **kwargs):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *_):
- pass
- async def get(self, url, **kwargs):
- return _MockResp(malicious_discovery)
- async def post(self, url, **kwargs):
- return _MockResp({})
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientSSRF):
- # oidc_authorize uses a path parameter, not query param
- authorize_resp = await async_client.get(
- f"/api/v1/auth/oidc/authorize/{provider_id}",
- follow_redirects=False,
- )
- # Must be rejected with 502 — B2 guard rejects invalid authorization_endpoint scheme
- assert authorize_resp.status_code == 502, authorize_resp.json()
- detail = authorize_resp.json().get("detail", "").lower()
- assert "authorization_endpoint" in detail or "invalid" in detail
- # ===========================================================================
- # Gap 7: Login rate limiting
- # ===========================================================================
- class TestLoginRateLimiting:
- """10+ failed logins for the same username must return 429."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_excessive_failed_logins_return_429(self, async_client: AsyncClient):
- from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
- # Setup auth but do NOT log in
- await async_client.post(
- AUTH_SETUP_URL,
- json={"auth_enabled": True, "admin_username": "ratelimit_user", "admin_password": "Ratelimit_pw1"},
- )
- status_codes = []
- for _ in range(MAX_LOGIN_ATTEMPTS + 2):
- resp = await async_client.post(
- LOGIN_URL,
- json={"username": "ratelimit_user", "password": "wrong_password"},
- )
- status_codes.append(resp.status_code)
- # The last attempts must be 429 (Too Many Requests)
- assert status_codes[-1] == 429, f"Expected 429 after {MAX_LOGIN_ATTEMPTS} failures, got: {status_codes}"
- # ===========================================================================
- # Gap 8: challenge_id cookie binding
- # ===========================================================================
- class TestChallengeIdCookieBinding:
- """A pre-auth token stolen from session A cannot be used from session B."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_pre_auth_token_rejected_without_matching_cookie(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- import pyotp
- from passlib.context import CryptContext
- _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
- # Set up user with TOTP
- await _setup_and_login(async_client, "cookie_bind_user", "cookie_bind_pw1")
- secret = pyotp.random_base32()
- totp_obj = pyotp.TOTP(secret)
- from sqlalchemy import select as sa_select
- from backend.app.models.user_totp import UserTOTP
- result = await db_session.execute(sa_select(User).where(User.username == "cookie_bind_user"))
- user = result.scalar_one()
- db_session.add(UserTOTP(user_id=user.id, secret=secret, is_enabled=True))
- await db_session.commit()
- # Login from "session A" — gets a pre_auth_token and a 2fa_challenge cookie
- login_resp = await async_client.post(
- LOGIN_URL, json={"username": "cookie_bind_user", "password": "Cookie_bind_pw1"}
- )
- assert login_resp.status_code == 200
- assert login_resp.json()["requires_2fa"] is True
- pre_auth_token = login_resp.json()["pre_auth_token"]
- # The async_client jar now holds the 2fa_challenge cookie for session A
- # Simulate session B by creating a new client WITHOUT the cookie
- from httpx import ASGITransport, AsyncClient as FreshClient
- from backend.app.main import app
- async with FreshClient(transport=ASGITransport(app=app), base_url="http://test") as session_b:
- # Attempt to use session A's pre_auth_token from session B (no cookie)
- verify_resp = await session_b.post(
- "/api/v1/auth/2fa/verify",
- json={
- "pre_auth_token": pre_auth_token,
- "code": totp_obj.now(),
- "method": "totp",
- },
- )
- # Must be rejected — pre_auth_token is bound to session A's cookie
- assert verify_resp.status_code == 401, (
- f"Expected 401 for token replay from cookieless session, got {verify_resp.status_code}: "
- f"{verify_resp.json()}"
- )
- # ===========================================================================
- # C2: Security-header middleware
- # ===========================================================================
- class TestSecurityHeaders:
- """Every HTTP response must include standard security headers (C2)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_security_headers_present(self, async_client: AsyncClient):
- """GET /api/v1/auth/me (unauthenticated → 401) still carries security headers."""
- resp = await async_client.get(ME_URL)
- assert resp.status_code == 401 # sanity — no auth token
- assert resp.headers.get("x-content-type-options") == "nosniff"
- assert resp.headers.get("x-frame-options") == "SAMEORIGIN"
- assert resp.headers.get("referrer-policy") == "strict-origin-when-cross-origin"
- csp = resp.headers.get("content-security-policy", "")
- assert "default-src 'self'" in csp
- assert "script-src 'self'" in csp
- assert "frame-ancestors 'none'" in csp
- assert "object-src 'none'" in csp
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_hsts_absent_for_http(self, async_client: AsyncClient):
- """HSTS must NOT be set over plain HTTP (test transport uses http)."""
- resp = await async_client.get(ME_URL)
- assert "strict-transport-security" not in resp.headers
- # ===========================================================================
- # I3: Rate-limit bucket interaction — IP spray vs. username spray
- # ===========================================================================
- class TestRateLimitBuckets:
- """IP-spray and username-spray must each trip the correct independent bucket."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_ip_spray_trips_ip_bucket(self, async_client: AsyncClient):
- """20 failed logins from one IP across 20 different usernames trips the IP bucket.
- Each per-username bucket only has 1 failure (well below MAX_LOGIN_ATTEMPTS=10),
- so the username bucket is never the reason for the 429.
- """
- from unittest.mock import patch as _patch
- unique_ip = "10.99.1.1"
- # Ensure auth is enabled
- await async_client.post(
- AUTH_SETUP_URL,
- json={"auth_enabled": True, "admin_username": "spray_ip_admin", "admin_password": "SprayIp_admin1"},
- )
- status_codes: list[int] = []
- with _patch("backend.app.api.routes.auth._get_client_ip", return_value=unique_ip):
- for i in range(22):
- resp = await async_client.post(
- LOGIN_URL,
- json={"username": f"spray_ip_victim_{i}", "password": "wrong"},
- )
- status_codes.append(resp.status_code)
- # The first 20 attempts fail with 401; the 21st+ must be 429 (IP bucket full)
- assert status_codes[-1] == 429, f"Expected 429 after 20 IP-spray failures, got: {status_codes}"
- # No single username saw more than one attempt → username buckets not tripped
- non_429 = [c for c in status_codes[:-2] if c == 429]
- assert not non_429, f"Username bucket triggered early: {status_codes}"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_username_spray_trips_username_bucket(self, async_client: AsyncClient):
- """One username targeted from 10+ different IPs trips the username bucket.
- Each per-IP bucket only sees 1 failure, so no IP bucket is tripped.
- The username bucket (max 10) is what fires the 429.
- """
- from unittest.mock import patch as _patch
- from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
- # Ensure auth is enabled
- await async_client.post(
- AUTH_SETUP_URL,
- json={
- "auth_enabled": True,
- "admin_username": "spray_uname_admin",
- "admin_password": "SprayUname_admin1",
- },
- )
- target_username = "spray_uname_victim"
- status_codes: list[int] = []
- for i in range(MAX_LOGIN_ATTEMPTS + 2):
- rotating_ip = f"10.99.2.{i + 1}"
- with _patch("backend.app.api.routes.auth._get_client_ip", return_value=rotating_ip):
- resp = await async_client.post(
- LOGIN_URL,
- json={"username": target_username, "password": "wrong"},
- )
- status_codes.append(resp.status_code)
- # After MAX_LOGIN_ATTEMPTS failures for same username the bucket fires
- assert status_codes[-1] == 429, (
- f"Expected 429 after {MAX_LOGIN_ATTEMPTS} username-spray failures, got: {status_codes}"
- )
|