test_security.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. """Security tests for the 8 coverage gaps identified in the maintainer review.
  2. Gap 1: encryption.py has zero tests
  3. Gap 2: JWT revocation (revoke_jti, is_jti_revoked, _is_token_fresh) untested
  4. Gap 3: OIDC exchange token replay untested
  5. Gap 4: OIDC email_verified claim handling untested
  6. Gap 5: Email OTP max-attempts invalidation untested
  7. Gap 6: OIDC callback error redirects (SSRF protection) undertested
  8. Gap 7: Login rate limiting untested
  9. Gap 8: challenge_id cookie binding untested
  10. """
  11. from __future__ import annotations
  12. import base64
  13. import secrets
  14. import time
  15. from datetime import datetime, timedelta, timezone
  16. from unittest.mock import AsyncMock, MagicMock, patch
  17. import jwt as pyjwt
  18. import pytest
  19. from cryptography.hazmat.primitives import serialization
  20. from cryptography.hazmat.primitives.asymmetric import rsa
  21. from httpx import AsyncClient
  22. from sqlalchemy.ext.asyncio import AsyncSession
  23. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  24. from backend.app.models.user import User
  25. AUTH_SETUP_URL = "/api/v1/auth/setup"
  26. LOGIN_URL = "/api/v1/auth/login"
  27. LOGOUT_URL = "/api/v1/auth/logout"
  28. ME_URL = "/api/v1/auth/me"
  29. def _auth_header(token: str) -> dict[str, str]:
  30. return {"Authorization": f"Bearer {token}"}
  31. def _norm_pw(password: str) -> str:
  32. """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
  33. if not any(c.isupper() for c in password):
  34. password = password[0].upper() + password[1:]
  35. if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
  36. password = password + "!"
  37. return password
  38. async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
  39. password = _norm_pw(password)
  40. await client.post(
  41. AUTH_SETUP_URL,
  42. json={"auth_enabled": True, "admin_username": username, "admin_password": password},
  43. )
  44. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  45. assert resp.status_code == 200
  46. return resp.json()["access_token"]
  47. def _make_test_rsa_key():
  48. def _b64url(n: int, length: int) -> str:
  49. return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
  50. private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  51. private_pem = private_key.private_bytes(
  52. serialization.Encoding.PEM,
  53. serialization.PrivateFormat.TraditionalOpenSSL,
  54. serialization.NoEncryption(),
  55. )
  56. pub_numbers = private_key.public_key().public_numbers()
  57. jwks = {
  58. "keys": [
  59. {
  60. "kty": "RSA",
  61. "use": "sig",
  62. "alg": "RS256",
  63. "kid": "test-kid-1",
  64. "n": _b64url(pub_numbers.n, 256),
  65. "e": _b64url(pub_numbers.e, 3),
  66. }
  67. ]
  68. }
  69. return private_pem, jwks
  70. # ===========================================================================
  71. # Gap 1: encryption.py unit tests
  72. # ===========================================================================
  73. class TestEncryption:
  74. """encrypt/decrypt round-trips, plaintext passthrough, RuntimeError on missing key."""
  75. def test_encrypt_decrypt_roundtrip_with_key(self):
  76. from cryptography.fernet import Fernet
  77. test_key = Fernet.generate_key().decode()
  78. import backend.app.core.encryption as enc_mod
  79. original = enc_mod._fernet_instance
  80. original_warn = enc_mod._warn_shown
  81. try:
  82. enc_mod._fernet_instance = None
  83. enc_mod._warn_shown = False
  84. with patch.dict("os.environ", {"MFA_ENCRYPTION_KEY": test_key}):
  85. ciphertext = enc_mod.mfa_encrypt("my-totp-secret")
  86. assert ciphertext.startswith("fernet:")
  87. assert enc_mod.mfa_decrypt(ciphertext) == "my-totp-secret"
  88. finally:
  89. enc_mod._fernet_instance = original
  90. enc_mod._warn_shown = original_warn
  91. def test_plaintext_passthrough_without_key(self):
  92. import backend.app.core.encryption as enc_mod
  93. original = enc_mod._fernet_instance
  94. original_warn = enc_mod._warn_shown
  95. try:
  96. enc_mod._fernet_instance = None
  97. enc_mod._warn_shown = False
  98. with patch.dict("os.environ", {}, clear=True):
  99. env = {k: v for k, v in __import__("os").environ.items() if k != "MFA_ENCRYPTION_KEY"}
  100. with patch.dict("os.environ", env, clear=True):
  101. result = enc_mod.mfa_encrypt("plaintext-secret")
  102. assert result == "plaintext-secret"
  103. assert enc_mod.mfa_decrypt("plaintext-secret") == "plaintext-secret"
  104. finally:
  105. enc_mod._fernet_instance = original
  106. enc_mod._warn_shown = original_warn
  107. def test_decrypt_raises_runtime_error_without_key_for_encrypted_value(self):
  108. import backend.app.core.encryption as enc_mod
  109. original = enc_mod._fernet_instance
  110. original_warn = enc_mod._warn_shown
  111. try:
  112. enc_mod._fernet_instance = None
  113. enc_mod._warn_shown = False
  114. # A value with the fernet: prefix but no key configured
  115. env = {k: v for k, v in __import__("os").environ.items() if k != "MFA_ENCRYPTION_KEY"}
  116. with (
  117. patch.dict("os.environ", env, clear=True),
  118. pytest.raises(RuntimeError, match="MFA_ENCRYPTION_KEY must be set"),
  119. ):
  120. enc_mod.mfa_decrypt("fernet:gAAAAA-fake-ciphertext")
  121. finally:
  122. enc_mod._fernet_instance = original
  123. enc_mod._warn_shown = original_warn
  124. # ===========================================================================
  125. # Gap 2: JWT revocation — revoke_jti, is_jti_revoked, _is_token_fresh, /me
  126. # ===========================================================================
  127. class TestJWTRevocation:
  128. """JWT revocation and token freshness checks."""
  129. @pytest.mark.asyncio
  130. @pytest.mark.integration
  131. async def test_revoke_jti_and_is_jti_revoked(self, async_client: AsyncClient, db_session: AsyncSession):
  132. """revoke_jti stores the JTI; is_jti_revoked returns True afterwards."""
  133. from backend.app.core.auth import is_jti_revoked, revoke_jti
  134. test_jti = secrets.token_urlsafe(16)
  135. expires = datetime.now(timezone.utc) + timedelta(hours=1)
  136. assert not await is_jti_revoked(test_jti)
  137. await revoke_jti(test_jti, expires, username="testuser")
  138. assert await is_jti_revoked(test_jti)
  139. @pytest.mark.asyncio
  140. @pytest.mark.integration
  141. async def test_revoke_jti_idempotent(self, async_client: AsyncClient):
  142. """Double-revocation of the same JTI should not raise."""
  143. from backend.app.core.auth import is_jti_revoked, revoke_jti
  144. jti = secrets.token_urlsafe(16)
  145. expires = datetime.now(timezone.utc) + timedelta(hours=1)
  146. await revoke_jti(jti, expires)
  147. await revoke_jti(jti, expires) # must not raise
  148. assert await is_jti_revoked(jti)
  149. def test_is_token_fresh_rejects_none_iat(self):
  150. """_is_token_fresh returns False when iat is None (I1 hard cutoff)."""
  151. from backend.app.core.auth import _is_token_fresh
  152. user = MagicMock()
  153. user.password_changed_at = None
  154. assert _is_token_fresh(None, user) is False
  155. def test_is_token_fresh_rejects_token_before_password_change(self):
  156. """_is_token_fresh returns False when iat predates password_changed_at."""
  157. from backend.app.core.auth import _is_token_fresh
  158. now = datetime.now(timezone.utc)
  159. user = MagicMock()
  160. user.password_changed_at = now
  161. old_iat = (now - timedelta(hours=1)).timestamp()
  162. assert _is_token_fresh(old_iat, user) is False
  163. def test_is_token_fresh_accepts_token_after_password_change(self):
  164. """_is_token_fresh returns True when iat is after password_changed_at."""
  165. from backend.app.core.auth import _is_token_fresh
  166. now = datetime.now(timezone.utc)
  167. user = MagicMock()
  168. user.password_changed_at = now - timedelta(hours=1)
  169. recent_iat = now.timestamp()
  170. assert _is_token_fresh(recent_iat, user) is True
  171. def test_is_token_fresh_returns_true_when_no_password_change(self):
  172. """_is_token_fresh returns True when password_changed_at is None (I2 migration not yet run)."""
  173. from backend.app.core.auth import _is_token_fresh
  174. user = MagicMock()
  175. user.password_changed_at = None
  176. assert _is_token_fresh(time.time(), user) is True
  177. @pytest.mark.asyncio
  178. @pytest.mark.integration
  179. async def test_me_endpoint_rejects_token_after_logout(self, async_client: AsyncClient):
  180. """After logout, the bearer token must be rejected by /me (B1 + revocation)."""
  181. token = await _setup_and_login(async_client, "sec_logout_me", "sec_logout_me1")
  182. # Token works before logout
  183. me_resp = await async_client.get(ME_URL, headers=_auth_header(token))
  184. assert me_resp.status_code == 200
  185. # Logout
  186. logout_resp = await async_client.post(LOGOUT_URL, headers=_auth_header(token))
  187. assert logout_resp.status_code == 200
  188. # Token must now be rejected
  189. me_after = await async_client.get(ME_URL, headers=_auth_header(token))
  190. assert me_after.status_code == 401
  191. # ===========================================================================
  192. # Gap 3: OIDC exchange token replay
  193. # ===========================================================================
  194. class TestOIDCExchangeReplay:
  195. """A single-use OIDC exchange token cannot be redeemed twice."""
  196. @pytest.mark.asyncio
  197. @pytest.mark.integration
  198. async def test_exchange_token_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
  199. """The second call to /oidc/exchange with the same token returns 401."""
  200. exchange_token = secrets.token_urlsafe(32)
  201. db_session.add(
  202. AuthEphemeralToken(
  203. token=exchange_token,
  204. token_type="oidc_exchange",
  205. username="oidc_replay_user",
  206. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  207. )
  208. )
  209. await db_session.commit()
  210. # Seed the user so the exchange can resolve it
  211. from backend.app.core.auth import get_password_hash
  212. from backend.app.core.database import async_session, seed_default_groups
  213. async with async_session() as db:
  214. result = await db.execute(__import__("sqlalchemy").select(User).where(User.username == "oidc_replay_user"))
  215. if result.scalar_one_or_none() is None:
  216. db.add(
  217. User(
  218. username="oidc_replay_user",
  219. password_hash=get_password_hash("pw"),
  220. is_active=True,
  221. )
  222. )
  223. await db.commit()
  224. first = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
  225. assert first.status_code == 200
  226. second = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
  227. assert second.status_code == 401
  228. # ===========================================================================
  229. # Gap 4: OIDC email_verified claim handling
  230. # ===========================================================================
  231. class TestOIDCEmailVerified:
  232. """email_verified: False/absent must not link OIDC identity to an existing email."""
  233. @pytest.mark.asyncio
  234. @pytest.mark.integration
  235. async def test_unverified_email_does_not_link_to_existing_user(
  236. self, async_client: AsyncClient, db_session: AsyncSession
  237. ):
  238. """If email_verified is False, the OIDC callback must not auto-link by email."""
  239. private_pem, jwks_data = _make_test_rsa_key()
  240. issuer = "https://idp.evtest.example.com"
  241. client_id = "ev-client"
  242. nonce = secrets.token_urlsafe(16)
  243. now = int(time.time())
  244. id_token = pyjwt.encode(
  245. {
  246. "sub": "ev-sub-new",
  247. "iss": issuer,
  248. "aud": client_id,
  249. "nonce": nonce,
  250. "email": "existing@example.com",
  251. "email_verified": False, # <-- must be ignored
  252. "iat": now,
  253. "exp": now + 300,
  254. },
  255. private_pem,
  256. algorithm="RS256",
  257. headers={"kid": "test-kid-1"},
  258. )
  259. admin_token = await _setup_and_login(async_client, "ev_admin", "ev_admin1")
  260. # Create existing user with the same email (use strong password for validator)
  261. create_user_resp = await async_client.post(
  262. "/api/v1/users",
  263. json={"username": "existing_email_user", "password": "Str0ng!Pass", "email": "existing@example.com"},
  264. headers=_auth_header(admin_token),
  265. )
  266. assert create_user_resp.status_code in (200, 201), create_user_resp.json()
  267. # Create OIDC provider
  268. create_resp = await async_client.post(
  269. "/api/v1/auth/oidc/providers",
  270. json={
  271. "name": "EV-IdP",
  272. "issuer_url": issuer,
  273. "client_id": client_id,
  274. "client_secret": "secret",
  275. "scopes": "openid email",
  276. "is_enabled": True,
  277. "auto_create_users": True,
  278. },
  279. headers=_auth_header(admin_token),
  280. )
  281. assert create_resp.status_code == 201
  282. provider_id = create_resp.json()["id"]
  283. state = secrets.token_urlsafe(32)
  284. code_verifier = secrets.token_urlsafe(48)
  285. db_session.add(
  286. AuthEphemeralToken(
  287. token=state,
  288. token_type="oidc_state",
  289. provider_id=provider_id,
  290. nonce=nonce,
  291. code_verifier=code_verifier,
  292. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  293. )
  294. )
  295. await db_session.commit()
  296. discovery_doc = {
  297. "issuer": issuer,
  298. "authorization_endpoint": f"{issuer}/auth",
  299. "token_endpoint": f"{issuer}/token",
  300. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  301. }
  302. class _MockResp:
  303. def __init__(self, data):
  304. self._data = data
  305. self.status_code = 200
  306. self.is_success = True
  307. self.text = str(data)
  308. def json(self):
  309. return self._data
  310. def raise_for_status(self):
  311. pass
  312. class _MockHttpxClientEV:
  313. def __init__(self, *args, **kwargs):
  314. pass
  315. async def __aenter__(self):
  316. return self
  317. async def __aexit__(self, *_):
  318. pass
  319. async def get(self, url, **kwargs):
  320. if "jwks" in url:
  321. return _MockResp(jwks_data)
  322. return _MockResp(discovery_doc)
  323. async def post(self, url, **kwargs):
  324. return _MockResp({"access_token": "mock", "token_type": "Bearer", "id_token": id_token})
  325. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientEV):
  326. await async_client.get(
  327. f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
  328. follow_redirects=False,
  329. )
  330. # Callback must NOT link to the existing_email_user — a new user is created
  331. # instead (because the email claim was ignored due to email_verified=False).
  332. # Either a new user is provisioned (redirect with oidc_token) or the callback
  333. # fails. In either case, the existing user must not have an OIDC link.
  334. from sqlalchemy import select as sa_select
  335. from backend.app.models.oidc_provider import UserOIDCLink
  336. link_result = await db_session.execute(
  337. sa_select(UserOIDCLink)
  338. .join(User, UserOIDCLink.user_id == User.id)
  339. .where(User.email == "existing@example.com")
  340. )
  341. link = link_result.scalar_one_or_none()
  342. assert link is None, "Existing user must not be auto-linked when email_verified is False"
  343. # ===========================================================================
  344. # Gap 5: Email OTP max-attempts invalidation
  345. # ===========================================================================
  346. class TestEmailOTPMaxAttempts:
  347. """After MAX_ATTEMPTS wrong codes, the OTP is permanently invalidated."""
  348. @pytest.mark.asyncio
  349. @pytest.mark.integration
  350. async def test_email_otp_invalidated_after_max_attempts(self, async_client: AsyncClient, db_session: AsyncSession):
  351. from passlib.context import CryptContext
  352. from sqlalchemy import select as sa_select
  353. from backend.app.models.user_otp_code import UserOTPCode
  354. _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  355. admin_token = await _setup_and_login(async_client, "otp_max_admin", "otp_max_admin1")
  356. # Enable email OTP for admin user
  357. result = await db_session.execute(sa_select(User).where(User.username == "otp_max_admin"))
  358. user = result.scalar_one()
  359. user.email = "otpmax@example.com"
  360. await db_session.commit()
  361. setup_code = "123456"
  362. from backend.app.models.auth_ephemeral import AuthEphemeralToken as AET
  363. setup_token = secrets.token_urlsafe(32)
  364. db_session.add(
  365. AET(
  366. token=setup_token,
  367. token_type="email_otp_setup",
  368. username="otp_max_admin",
  369. nonce=_pwd_ctx.hash(setup_code),
  370. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  371. )
  372. )
  373. await db_session.commit()
  374. await async_client.post(
  375. "/api/v1/auth/2fa/email/enable/confirm",
  376. json={"setup_token": setup_token, "code": setup_code},
  377. headers=_auth_header(admin_token),
  378. )
  379. # Login to get pre_auth_token
  380. login_resp = await async_client.post(
  381. LOGIN_URL, json={"username": "otp_max_admin", "password": "Otp_max_admin1"}
  382. )
  383. pre_auth_token = login_resp.json()["pre_auth_token"]
  384. # Insert an OTP record directly (bypassing SMTP)
  385. real_code = "654321"
  386. otp = UserOTPCode(
  387. user_id=user.id,
  388. code_hash=_pwd_ctx.hash(real_code),
  389. attempts=0,
  390. used=False,
  391. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  392. )
  393. db_session.add(otp)
  394. await db_session.commit()
  395. # Submit MAX_ATTEMPTS wrong codes
  396. from backend.app.api.routes.mfa import MAX_2FA_ATTEMPTS
  397. for _ in range(MAX_2FA_ATTEMPTS):
  398. r = await async_client.post(
  399. "/api/v1/auth/2fa/verify",
  400. json={"pre_auth_token": pre_auth_token, "code": "000000", "method": "email"},
  401. )
  402. # Each attempt must fail with 401
  403. assert r.status_code == 401
  404. # After max attempts, the correct code is also rejected (either OTP
  405. # invalidated → 401, or rate limit hit → 429). Either means locked out.
  406. final = await async_client.post(
  407. "/api/v1/auth/2fa/verify",
  408. json={"pre_auth_token": pre_auth_token, "code": real_code, "method": "email"},
  409. )
  410. assert final.status_code in (401, 429), f"Expected lockout, got {final.status_code}: {final.json()}"
  411. # ===========================================================================
  412. # Gap 6: OIDC callback SSRF protection — invalid authorization_endpoint scheme
  413. # ===========================================================================
  414. class TestOIDCSSRFProtection:
  415. """authorization_endpoint with non-http(s) scheme must be rejected."""
  416. @pytest.mark.asyncio
  417. @pytest.mark.integration
  418. async def test_invalid_authorization_endpoint_scheme_rejected(
  419. self, async_client: AsyncClient, db_session: AsyncSession
  420. ):
  421. issuer = "https://idp.ssrf.example.com"
  422. client_id = "ssrf-client"
  423. admin_token = await _setup_and_login(async_client, "ssrf_admin", "ssrf_admin1")
  424. create_resp = await async_client.post(
  425. "/api/v1/auth/oidc/providers",
  426. json={
  427. "name": "SSRF-IdP",
  428. "issuer_url": issuer,
  429. "client_id": client_id,
  430. "client_secret": "secret",
  431. "scopes": "openid",
  432. "is_enabled": True,
  433. "auto_create_users": False,
  434. },
  435. headers=_auth_header(admin_token),
  436. )
  437. assert create_resp.status_code == 201
  438. provider_id = create_resp.json()["id"]
  439. # Discovery doc returns a javascript: authorization_endpoint
  440. malicious_discovery = {
  441. "issuer": issuer,
  442. "authorization_endpoint": "javascript:alert(1)", # <-- malicious
  443. "token_endpoint": f"{issuer}/token",
  444. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  445. }
  446. class _MockResp:
  447. def __init__(self, data):
  448. self._data = data
  449. self.status_code = 200
  450. self.is_success = True
  451. self.text = str(data)
  452. def json(self):
  453. return self._data
  454. def raise_for_status(self):
  455. pass
  456. class _MockHttpxClientSSRF:
  457. def __init__(self, *args, **kwargs):
  458. pass
  459. async def __aenter__(self):
  460. return self
  461. async def __aexit__(self, *_):
  462. pass
  463. async def get(self, url, **kwargs):
  464. return _MockResp(malicious_discovery)
  465. async def post(self, url, **kwargs):
  466. return _MockResp({})
  467. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientSSRF):
  468. # oidc_authorize uses a path parameter, not query param
  469. authorize_resp = await async_client.get(
  470. f"/api/v1/auth/oidc/authorize/{provider_id}",
  471. follow_redirects=False,
  472. )
  473. # Must be rejected with 502 — B2 guard rejects invalid authorization_endpoint scheme
  474. assert authorize_resp.status_code == 502, authorize_resp.json()
  475. detail = authorize_resp.json().get("detail", "").lower()
  476. assert "authorization_endpoint" in detail or "invalid" in detail
  477. # ===========================================================================
  478. # Gap 7: Login rate limiting
  479. # ===========================================================================
  480. class TestLoginRateLimiting:
  481. """10+ failed logins for the same username must return 429."""
  482. @pytest.mark.asyncio
  483. @pytest.mark.integration
  484. async def test_excessive_failed_logins_return_429(self, async_client: AsyncClient):
  485. from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
  486. # Setup auth but do NOT log in
  487. await async_client.post(
  488. AUTH_SETUP_URL,
  489. json={"auth_enabled": True, "admin_username": "ratelimit_user", "admin_password": "Ratelimit_pw1"},
  490. )
  491. status_codes = []
  492. for _ in range(MAX_LOGIN_ATTEMPTS + 2):
  493. resp = await async_client.post(
  494. LOGIN_URL,
  495. json={"username": "ratelimit_user", "password": "wrong_password"},
  496. )
  497. status_codes.append(resp.status_code)
  498. # The last attempts must be 429 (Too Many Requests)
  499. assert status_codes[-1] == 429, f"Expected 429 after {MAX_LOGIN_ATTEMPTS} failures, got: {status_codes}"
  500. # ===========================================================================
  501. # Gap 8: challenge_id cookie binding
  502. # ===========================================================================
  503. class TestChallengeIdCookieBinding:
  504. """A pre-auth token stolen from session A cannot be used from session B."""
  505. @pytest.mark.asyncio
  506. @pytest.mark.integration
  507. async def test_pre_auth_token_rejected_without_matching_cookie(
  508. self, async_client: AsyncClient, db_session: AsyncSession
  509. ):
  510. import pyotp
  511. from passlib.context import CryptContext
  512. _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  513. # Set up user with TOTP
  514. await _setup_and_login(async_client, "cookie_bind_user", "cookie_bind_pw1")
  515. secret = pyotp.random_base32()
  516. totp_obj = pyotp.TOTP(secret)
  517. from sqlalchemy import select as sa_select
  518. from backend.app.models.user_totp import UserTOTP
  519. result = await db_session.execute(sa_select(User).where(User.username == "cookie_bind_user"))
  520. user = result.scalar_one()
  521. db_session.add(UserTOTP(user_id=user.id, secret=secret, is_enabled=True))
  522. await db_session.commit()
  523. # Login from "session A" — gets a pre_auth_token and a 2fa_challenge cookie
  524. login_resp = await async_client.post(
  525. LOGIN_URL, json={"username": "cookie_bind_user", "password": "Cookie_bind_pw1"}
  526. )
  527. assert login_resp.status_code == 200
  528. assert login_resp.json()["requires_2fa"] is True
  529. pre_auth_token = login_resp.json()["pre_auth_token"]
  530. # The async_client jar now holds the 2fa_challenge cookie for session A
  531. # Simulate session B by creating a new client WITHOUT the cookie
  532. from httpx import ASGITransport, AsyncClient as FreshClient
  533. from backend.app.main import app
  534. async with FreshClient(transport=ASGITransport(app=app), base_url="http://test") as session_b:
  535. # Attempt to use session A's pre_auth_token from session B (no cookie)
  536. verify_resp = await session_b.post(
  537. "/api/v1/auth/2fa/verify",
  538. json={
  539. "pre_auth_token": pre_auth_token,
  540. "code": totp_obj.now(),
  541. "method": "totp",
  542. },
  543. )
  544. # Must be rejected — pre_auth_token is bound to session A's cookie
  545. assert verify_resp.status_code == 401, (
  546. f"Expected 401 for token replay from cookieless session, got {verify_resp.status_code}: "
  547. f"{verify_resp.json()}"
  548. )
  549. # ===========================================================================
  550. # C2: Security-header middleware
  551. # ===========================================================================
  552. class TestSecurityHeaders:
  553. """Every HTTP response must include standard security headers (C2)."""
  554. @pytest.mark.asyncio
  555. @pytest.mark.integration
  556. async def test_security_headers_present(self, async_client: AsyncClient):
  557. """GET /api/v1/auth/me (unauthenticated → 401) still carries security headers."""
  558. resp = await async_client.get(ME_URL)
  559. assert resp.status_code == 401 # sanity — no auth token
  560. assert resp.headers.get("x-content-type-options") == "nosniff"
  561. assert resp.headers.get("x-frame-options") == "SAMEORIGIN"
  562. assert resp.headers.get("referrer-policy") == "strict-origin-when-cross-origin"
  563. csp = resp.headers.get("content-security-policy", "")
  564. assert "default-src 'self'" in csp
  565. assert "script-src 'self'" in csp
  566. assert "frame-ancestors 'none'" in csp
  567. assert "object-src 'none'" in csp
  568. @pytest.mark.asyncio
  569. @pytest.mark.integration
  570. async def test_hsts_absent_for_http(self, async_client: AsyncClient):
  571. """HSTS must NOT be set over plain HTTP (test transport uses http)."""
  572. resp = await async_client.get(ME_URL)
  573. assert "strict-transport-security" not in resp.headers
  574. # ===========================================================================
  575. # I3: Rate-limit bucket interaction — IP spray vs. username spray
  576. # ===========================================================================
  577. class TestRateLimitBuckets:
  578. """IP-spray and username-spray must each trip the correct independent bucket."""
  579. @pytest.mark.asyncio
  580. @pytest.mark.integration
  581. async def test_ip_spray_trips_ip_bucket(self, async_client: AsyncClient):
  582. """20 failed logins from one IP across 20 different usernames trips the IP bucket.
  583. Each per-username bucket only has 1 failure (well below MAX_LOGIN_ATTEMPTS=10),
  584. so the username bucket is never the reason for the 429.
  585. """
  586. from unittest.mock import patch as _patch
  587. unique_ip = "10.99.1.1"
  588. # Ensure auth is enabled
  589. await async_client.post(
  590. AUTH_SETUP_URL,
  591. json={"auth_enabled": True, "admin_username": "spray_ip_admin", "admin_password": "SprayIp_admin1"},
  592. )
  593. status_codes: list[int] = []
  594. with _patch("backend.app.api.routes.auth._get_client_ip", return_value=unique_ip):
  595. for i in range(22):
  596. resp = await async_client.post(
  597. LOGIN_URL,
  598. json={"username": f"spray_ip_victim_{i}", "password": "wrong"},
  599. )
  600. status_codes.append(resp.status_code)
  601. # The first 20 attempts fail with 401; the 21st+ must be 429 (IP bucket full)
  602. assert status_codes[-1] == 429, f"Expected 429 after 20 IP-spray failures, got: {status_codes}"
  603. # No single username saw more than one attempt → username buckets not tripped
  604. non_429 = [c for c in status_codes[:-2] if c == 429]
  605. assert not non_429, f"Username bucket triggered early: {status_codes}"
  606. @pytest.mark.asyncio
  607. @pytest.mark.integration
  608. async def test_username_spray_trips_username_bucket(self, async_client: AsyncClient):
  609. """One username targeted from 10+ different IPs trips the username bucket.
  610. Each per-IP bucket only sees 1 failure, so no IP bucket is tripped.
  611. The username bucket (max 10) is what fires the 429.
  612. """
  613. from unittest.mock import patch as _patch
  614. from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
  615. # Ensure auth is enabled
  616. await async_client.post(
  617. AUTH_SETUP_URL,
  618. json={
  619. "auth_enabled": True,
  620. "admin_username": "spray_uname_admin",
  621. "admin_password": "SprayUname_admin1",
  622. },
  623. )
  624. target_username = "spray_uname_victim"
  625. status_codes: list[int] = []
  626. for i in range(MAX_LOGIN_ATTEMPTS + 2):
  627. rotating_ip = f"10.99.2.{i + 1}"
  628. with _patch("backend.app.api.routes.auth._get_client_ip", return_value=rotating_ip):
  629. resp = await async_client.post(
  630. LOGIN_URL,
  631. json={"username": target_username, "password": "wrong"},
  632. )
  633. status_codes.append(resp.status_code)
  634. # After MAX_LOGIN_ATTEMPTS failures for same username the bucket fires
  635. assert status_codes[-1] == 429, (
  636. f"Expected 429 after {MAX_LOGIN_ATTEMPTS} username-spray failures, got: {status_codes}"
  637. )