|
@@ -18,8 +18,11 @@ Tests the full request/response cycle for:
|
|
|
from __future__ import annotations
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
import secrets
|
|
import secrets
|
|
|
|
|
+import time
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
|
+from unittest.mock import patch
|
|
|
|
|
|
|
|
|
|
+import jwt as pyjwt
|
|
|
import pyotp
|
|
import pyotp
|
|
|
import pytest
|
|
import pytest
|
|
|
from httpx import AsyncClient
|
|
from httpx import AsyncClient
|
|
@@ -3263,3 +3266,777 @@ class TestOIDCCallbackCodeLength:
|
|
|
follow_redirects=False,
|
|
follow_redirects=False,
|
|
|
)
|
|
)
|
|
|
assert resp.status_code == 422, "2049-char state must be rejected by Pydantic"
|
|
assert resp.status_code == 422, "2049-char state must be rejected by Pydantic"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
|
|
+# Helpers shared by TestOIDCEmailClaimResolution
|
|
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _run_oidc_callback(
|
|
|
|
|
+ async_client: AsyncClient,
|
|
|
|
|
+ db_session: AsyncSession,
|
|
|
|
|
+ *,
|
|
|
|
|
+ provider_id: int,
|
|
|
|
|
+ claims: dict,
|
|
|
|
|
+ private_pem: bytes,
|
|
|
|
|
+ jwks_data: dict,
|
|
|
|
|
+ issuer: str,
|
|
|
|
|
+ client_id: str,
|
|
|
|
|
+) -> str:
|
|
|
|
|
+ """Run a full OIDC callback flow and return the redirect location."""
|
|
|
|
|
+ nonce = secrets.token_urlsafe(16)
|
|
|
|
|
+ now = int(time.time())
|
|
|
|
|
+ token_claims = {
|
|
|
|
|
+ "sub": claims.get("sub", f"sub-{secrets.token_hex(8)}"),
|
|
|
|
|
+ "iss": issuer,
|
|
|
|
|
+ "aud": client_id,
|
|
|
|
|
+ "nonce": nonce,
|
|
|
|
|
+ "iat": now,
|
|
|
|
|
+ "exp": now + 300,
|
|
|
|
|
+ **{k: v for k, v in claims.items() if k not in ("sub",)},
|
|
|
|
|
+ }
|
|
|
|
|
+ id_token = pyjwt.encode(token_claims, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"})
|
|
|
|
|
+
|
|
|
|
|
+ state = secrets.token_urlsafe(32)
|
|
|
|
|
+ code_verifier = secrets.token_urlsafe(48)
|
|
|
|
|
+ db_session.add(
|
|
|
|
|
+ AuthEphemeralToken(
|
|
|
|
|
+ token=state,
|
|
|
|
|
+ token_type="oidc_state",
|
|
|
|
|
+ provider_id=provider_id,
|
|
|
|
|
+ nonce=nonce,
|
|
|
|
|
+ code_verifier=code_verifier,
|
|
|
|
|
+ expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+ await db_session.commit()
|
|
|
|
|
+
|
|
|
|
|
+ discovery_doc = {
|
|
|
|
|
+ "issuer": issuer,
|
|
|
|
|
+ "authorization_endpoint": f"{issuer}/auth",
|
|
|
|
|
+ "token_endpoint": f"{issuer}/token",
|
|
|
|
|
+ "jwks_uri": f"{issuer}/.well-known/jwks.json",
|
|
|
|
|
+ }
|
|
|
|
|
+ token_response = {"access_token": "mock-access", "token_type": "Bearer", "id_token": id_token}
|
|
|
|
|
+
|
|
|
|
|
+ class _R:
|
|
|
|
|
+ def __init__(self, data):
|
|
|
|
|
+ self._data = data
|
|
|
|
|
+ self.status_code = 200
|
|
|
|
|
+ self.is_success = True
|
|
|
|
|
+ self.text = str(data)
|
|
|
|
|
+
|
|
|
|
|
+ def json(self):
|
|
|
|
|
+ return self._data
|
|
|
|
|
+
|
|
|
|
|
+ def raise_for_status(self):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ class _C:
|
|
|
|
|
+ def __init__(self, *a, **kw):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ async def __aenter__(self):
|
|
|
|
|
+ return self
|
|
|
|
|
+
|
|
|
|
|
+ async def __aexit__(self, *a):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ async def get(self, url, **kw):
|
|
|
|
|
+ return _R(jwks_data if "jwks" in url else discovery_doc)
|
|
|
|
|
+
|
|
|
|
|
+ async def post(self, url, **kw):
|
|
|
|
|
+ return _R(token_response)
|
|
|
|
|
+
|
|
|
|
|
+ with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _C):
|
|
|
|
|
+ resp = await async_client.get(
|
|
|
|
|
+ f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
|
|
|
|
|
+ follow_redirects=False,
|
|
|
|
|
+ )
|
|
|
|
|
+ return resp.headers.get("location", "")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TestOIDCEmailClaimResolution:
|
|
|
|
|
+ """Three-case email resolution logic: Fall A / Fall B / Fall C."""
|
|
|
|
|
+
|
|
|
|
|
+ # ── shared helpers ────────────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ async def _create_provider(
|
|
|
|
|
+ self,
|
|
|
|
|
+ async_client: AsyncClient,
|
|
|
|
|
+ admin_token: str,
|
|
|
|
|
+ issuer: str,
|
|
|
|
|
+ client_id: str,
|
|
|
|
|
+ *,
|
|
|
|
|
+ email_claim: str = "email",
|
|
|
|
|
+ require_email_verified: bool = True,
|
|
|
|
|
+ auto_link_existing_accounts: bool = False,
|
|
|
|
|
+ suffix: str = "",
|
|
|
|
|
+ ) -> int:
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": f"TestIdP-{suffix or secrets.token_hex(4)}",
|
|
|
|
|
+ "issuer_url": issuer,
|
|
|
|
|
+ "client_id": client_id,
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ "is_enabled": True,
|
|
|
|
|
+ "auto_create_users": True,
|
|
|
|
|
+ "auto_link_existing_accounts": auto_link_existing_accounts,
|
|
|
|
|
+ "email_claim": email_claim,
|
|
|
|
|
+ "require_email_verified": require_email_verified,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 201, resp.text
|
|
|
|
|
+ return resp.json()["id"]
|
|
|
|
|
+
|
|
|
|
|
+ async def _get_oidc_link(self, db_session: AsyncSession, provider_id: int, sub: str):
|
|
|
|
|
+ from sqlalchemy import select
|
|
|
|
|
+
|
|
|
|
|
+ from backend.app.models.oidc_provider import UserOIDCLink
|
|
|
|
|
+
|
|
|
|
|
+ result = await db_session.execute(
|
|
|
|
|
+ select(UserOIDCLink)
|
|
|
|
|
+ .where(UserOIDCLink.provider_id == provider_id)
|
|
|
|
|
+ .where(UserOIDCLink.provider_user_id == sub)
|
|
|
|
|
+ )
|
|
|
|
|
+ return result.scalar_one_or_none()
|
|
|
|
|
+
|
|
|
|
|
+ # ── Parametrized matrix: Fall A / Fall B / Fall C ─────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ @pytest.mark.parametrize(
|
|
|
|
|
+ "email_claim,require_ev,claims,expected",
|
|
|
|
|
+ [
|
|
|
|
|
+ # Fall A: standard claim + require_ev=True (default)
|
|
|
|
|
+ ("email", True, {"email": "fa@example.com", "email_verified": True}, "fa@example.com"),
|
|
|
|
|
+ ("email", True, {"email": "fa@example.com", "email_verified": False}, None),
|
|
|
|
|
+ ("email", True, {"email": "fa@example.com"}, None), # Azure Entra with default config
|
|
|
|
|
+ # Fall A + SEC-2: malformed email claim rejected even when email_verified=True
|
|
|
|
|
+ ("email", True, {"email": "notanemail", "email_verified": True}, None),
|
|
|
|
|
+ # Fall B: standard claim + require_ev=False (Azure Entra permissive)
|
|
|
|
|
+ ("email", False, {"email": "fb@example.com", "email_verified": True}, "fb@example.com"),
|
|
|
|
|
+ ("email", False, {"email": "fb@example.com", "email_verified": False}, None),
|
|
|
|
|
+ ("email", False, {"email": "azure@company.com"}, "azure@company.com"), # ev absent → kept
|
|
|
|
|
+ # Fall B + SEC-2: malformed email claim rejected in permissive mode (ev absent)
|
|
|
|
|
+ ("email", False, {"email": "user@nodot"}, None),
|
|
|
|
|
+ # Fall B + SEC-2: shape check fires before email_verified=False drop
|
|
|
|
|
+ ("email", False, {"email": "notanemail", "email_verified": False}, None),
|
|
|
|
|
+ # Fall C: custom claim (preferred_username) — no email_verified check
|
|
|
|
|
+ ("preferred_username", True, {"preferred_username": "User@Company.COM"}, "user@company.com"),
|
|
|
|
|
+ ("preferred_username", True, {"preferred_username": " User@EXAMPLE.COM "}, "user@example.com"),
|
|
|
|
|
+ ("preferred_username", True, {"preferred_username": "justausername"}, None),
|
|
|
|
|
+ ("preferred_username", True, {"preferred_username": "@"}, None), # SEC-2: "@" only
|
|
|
|
|
+ ("preferred_username", True, {"preferred_username": "@domain.com"}, None), # SEC-2: empty local
|
|
|
|
|
+ ("preferred_username", True, {"preferred_username": "user@"}, None), # SEC-2: empty domain
|
|
|
|
|
+ ("preferred_username", True, {"preferred_username": "user@nodot"}, None), # SEC-2: no dot in domain
|
|
|
|
|
+ ("preferred_username", True, {}, None), # claim absent
|
|
|
|
|
+ # Fall C: email_verified=False present alongside custom claim — must NOT suppress the email
|
|
|
|
|
+ ("preferred_username", True, {"preferred_username": "user@co.com", "email_verified": False}, "user@co.com"),
|
|
|
|
|
+ ],
|
|
|
|
|
+ ids=[
|
|
|
|
|
+ "fall-a-ev-true",
|
|
|
|
|
+ "fall-a-ev-false",
|
|
|
|
|
+ "fall-a-ev-absent",
|
|
|
|
|
+ "fall-a-malformed-email",
|
|
|
|
|
+ "fall-b-ev-true",
|
|
|
|
|
+ "fall-b-ev-false",
|
|
|
|
|
+ "fall-b-ev-absent",
|
|
|
|
|
+ "fall-b-malformed-email",
|
|
|
|
|
+ "fall-b-malformed-email-ev-false",
|
|
|
|
|
+ "fall-c-valid-upn",
|
|
|
|
|
+ "fall-c-lowercase-strip",
|
|
|
|
|
+ "fall-c-no-at",
|
|
|
|
|
+ "fall-c-at-only",
|
|
|
|
|
+ "fall-c-empty-local",
|
|
|
|
|
+ "fall-c-empty-domain",
|
|
|
|
|
+ "fall-c-no-dot-in-domain",
|
|
|
|
|
+ "fall-c-claim-absent",
|
|
|
|
|
+ "fall-c-ev-false-ignored",
|
|
|
|
|
+ ],
|
|
|
|
|
+ )
|
|
|
|
|
+ async def test_email_resolution_matrix(
|
|
|
|
|
+ self,
|
|
|
|
|
+ async_client: AsyncClient,
|
|
|
|
|
+ db_session: AsyncSession,
|
|
|
|
|
+ email_claim: str,
|
|
|
|
|
+ require_ev: bool,
|
|
|
|
|
+ claims: dict,
|
|
|
|
|
+ expected: str | None,
|
|
|
|
|
+ ):
|
|
|
|
|
+ """C4: Verify link exists AND check provider_email — avoids false-passing on callback failure."""
|
|
|
|
|
+ issuer = "https://matrix.test"
|
|
|
|
|
+ client_id = "matrix-client"
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "matrix_adm", "Matrix123!")
|
|
|
|
|
+ private_pem, jwks_data = _make_test_rsa_key()
|
|
|
|
|
+ provider_id = await self._create_provider(
|
|
|
|
|
+ async_client,
|
|
|
|
|
+ admin_token,
|
|
|
|
|
+ issuer,
|
|
|
|
|
+ client_id,
|
|
|
|
|
+ email_claim=email_claim,
|
|
|
|
|
+ require_email_verified=require_ev,
|
|
|
|
|
+ suffix="matrix",
|
|
|
|
|
+ )
|
|
|
|
|
+ sub = f"sub-matrix-{secrets.token_hex(6)}"
|
|
|
|
|
+ await _run_oidc_callback(
|
|
|
|
|
+ async_client,
|
|
|
|
|
+ db_session,
|
|
|
|
|
+ provider_id=provider_id,
|
|
|
|
|
+ claims={"sub": sub, **claims},
|
|
|
|
|
+ private_pem=private_pem,
|
|
|
|
|
+ jwks_data=jwks_data,
|
|
|
|
|
+ issuer=issuer,
|
|
|
|
|
+ client_id=client_id,
|
|
|
|
|
+ )
|
|
|
|
|
+ db_session.expire_all()
|
|
|
|
|
+ link = await self._get_oidc_link(db_session, provider_id, sub)
|
|
|
|
|
+ assert link is not None, "UserOIDCLink must be created even when email is dropped"
|
|
|
|
|
+ assert link.provider_email == expected
|
|
|
|
|
+
|
|
|
|
|
+ # ── Security: auto_link guards (CREATE endpoint) ──────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_auto_link_blocked_with_require_ev_false(self, async_client: AsyncClient):
|
|
|
|
|
+ """SEC-1: auto_link + require_email_verified=False must be rejected at schema level (422)."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "sec1_adm", "Sec1Adm123!")
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "SEC1-Test",
|
|
|
|
|
+ "issuer_url": "https://sec1.test",
|
|
|
|
|
+ "client_id": "sec1-client",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ "auto_link_existing_accounts": True,
|
|
|
|
|
+ "require_email_verified": False,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_auto_link_blocked_with_custom_claim_create(self, async_client: AsyncClient):
|
|
|
|
|
+ """SEC-6: auto_link + email_claim!='email' must be rejected at schema level on CREATE (422)."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "sec6c_adm", "Sec6CAdm123!")
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "SEC6-Create-Test",
|
|
|
|
|
+ "issuer_url": "https://sec6c.test",
|
|
|
|
|
+ "client_id": "sec6c-client",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ "auto_link_existing_accounts": True,
|
|
|
|
|
+ "email_claim": "upn",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_auto_link_blocked_with_custom_claim_update(self, async_client: AsyncClient):
|
|
|
|
|
+ """SEC-6: auto_link=True + email_claim='upn' in same UPDATE request → 422 (T4)."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "sec6u_adm", "Sec6UAdm123!")
|
|
|
|
|
+ create_resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "SEC6-Update-Test",
|
|
|
|
|
+ "issuer_url": "https://sec6u.test",
|
|
|
|
|
+ "client_id": "sec6u-client",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert create_resp.status_code == 201
|
|
|
|
|
+ provider_id = create_resp.json()["id"]
|
|
|
|
|
+ resp = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"auto_link_existing_accounts": True, "email_claim": "upn"},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ # ── Combined-State-Guard (partial updates across two requests) ─────────────
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_partial_update_guard_require_ev(self, async_client: AsyncClient):
|
|
|
|
|
+ """SEC-1 Combined-State-Guard: require_ev=False then auto_link=True → 422 (T1 require_ev path)."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "pg_rev_adm", "PgRev123!")
|
|
|
|
|
+ create_resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "PG-RequireEV-Test",
|
|
|
|
|
+ "issuer_url": "https://pg-rev.test",
|
|
|
|
|
+ "client_id": "pg-rev-client",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert create_resp.status_code == 201
|
|
|
|
|
+ provider_id = create_resp.json()["id"]
|
|
|
|
|
+
|
|
|
|
|
+ upd1 = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"require_email_verified": False},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd1.status_code == 200
|
|
|
|
|
+
|
|
|
|
|
+ upd2 = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"auto_link_existing_accounts": True},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd2.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_partial_update_guard_email_claim(self, async_client: AsyncClient):
|
|
|
|
|
+ """SEC-6 Combined-State-Guard: email_claim='upn' then auto_link=True → 422 (T1 email_claim path)."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "pg_ec_adm", "PgEc123!")
|
|
|
|
|
+ create_resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "PG-EmailClaim-Test",
|
|
|
|
|
+ "issuer_url": "https://pg-ec.test",
|
|
|
|
|
+ "client_id": "pg-ec-client",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert create_resp.status_code == 201
|
|
|
|
|
+ provider_id = create_resp.json()["id"]
|
|
|
|
|
+
|
|
|
|
|
+ upd1 = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"email_claim": "upn"},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd1.status_code == 200
|
|
|
|
|
+
|
|
|
|
|
+ upd2 = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"auto_link_existing_accounts": True},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd2.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_partial_update_guard_inverse_order(self, async_client: AsyncClient):
|
|
|
|
|
+ """T2: auto_link=True first (valid), then require_ev=False → Combined-State-Guard fires (422)."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "pg_inv_adm", "PgInv123!")
|
|
|
|
|
+ create_resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "PG-Inverse-Test",
|
|
|
|
|
+ "issuer_url": "https://pg-inv.test",
|
|
|
|
|
+ "client_id": "pg-inv-client",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert create_resp.status_code == 201
|
|
|
|
|
+ provider_id = create_resp.json()["id"]
|
|
|
|
|
+
|
|
|
|
|
+ # auto_link=True is safe when require_ev=True (default)
|
|
|
|
|
+ upd1 = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"auto_link_existing_accounts": True},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd1.status_code == 200
|
|
|
|
|
+
|
|
|
|
|
+ # Disabling require_ev with auto_link already on → unsafe combined state
|
|
|
|
|
+ upd2 = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"require_email_verified": False},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd2.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ # ── Low5: Response fields verified ───────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_create_response_includes_new_fields(self, async_client: AsyncClient):
|
|
|
|
|
+ """Low5: OIDCProviderResponse must include email_claim and require_email_verified."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "resp_adm", "Resp123!")
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "ResponseFields-Test",
|
|
|
|
|
+ "issuer_url": "https://resp.test",
|
|
|
|
|
+ "client_id": "resp-client",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ "email_claim": "preferred_username",
|
|
|
|
|
+ "require_email_verified": False,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 201
|
|
|
|
|
+ data = resp.json()
|
|
|
|
|
+ assert data["email_claim"] == "preferred_username"
|
|
|
|
|
+ assert data["require_email_verified"] is False
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_update_response_reflects_new_fields(self, async_client: AsyncClient):
|
|
|
|
|
+ """Low5: PUT response must reflect updated email_claim and require_email_verified."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "upd_resp_adm", "UpdResp123!")
|
|
|
|
|
+ create_resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "UpdateResponse-Test",
|
|
|
|
|
+ "issuer_url": "https://upd-resp.test",
|
|
|
|
|
+ "client_id": "upd-resp-client",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert create_resp.status_code == 201
|
|
|
|
|
+ provider_id = create_resp.json()["id"]
|
|
|
|
|
+
|
|
|
|
|
+ upd = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"email_claim": "upn", "require_email_verified": True},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd.status_code == 200
|
|
|
|
|
+ data = upd.json()
|
|
|
|
|
+ assert data["email_claim"] == "upn"
|
|
|
|
|
+ assert data["require_email_verified"] is True
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ===========================================================================
|
|
|
|
|
+# TestOIDCEmailClaimValidation — T2: email_claim field validator coverage
|
|
|
|
|
+# ===========================================================================
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TestOIDCEmailClaimValidation:
|
|
|
|
|
+ """Schema-level validation for the email_claim field."""
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_invalid_claim_name_dot_rejected(self, async_client: AsyncClient):
|
|
|
|
|
+ """email_claim with a dot (log-injection risk) must be rejected."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "ecv_adm1", "Ecv123!")
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "ECVTest1",
|
|
|
|
|
+ "issuer_url": "https://ecv1.test",
|
|
|
|
|
+ "client_id": "ecv1",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email",
|
|
|
|
|
+ "email_claim": "email.address",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_invalid_claim_name_starts_with_digit_rejected(self, async_client: AsyncClient):
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "ecv_adm2", "Ecv123!")
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "ECVTest2",
|
|
|
|
|
+ "issuer_url": "https://ecv2.test",
|
|
|
|
|
+ "client_id": "ecv2",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email",
|
|
|
|
|
+ "email_claim": "1invalid",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_invalid_claim_name_newline_rejected(self, async_client: AsyncClient):
|
|
|
|
|
+ """T2 regex-bug guard: re.fullmatch must reject trailing newline."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "ecv_adm3", "Ecv123!")
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "ECVTest3",
|
|
|
|
|
+ "issuer_url": "https://ecv3.test",
|
|
|
|
|
+ "client_id": "ecv3",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email",
|
|
|
|
|
+ "email_claim": "email\n",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_claim_name_65_chars_rejected(self, async_client: AsyncClient):
|
|
|
|
|
+ """email_claim longer than 64 characters must be rejected."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "ecv_adm4", "Ecv123!")
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "ECVTest4",
|
|
|
|
|
+ "issuer_url": "https://ecv4.test",
|
|
|
|
|
+ "client_id": "ecv4",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email",
|
|
|
|
|
+ "email_claim": "a" * 65,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 422
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_valid_claim_name_accepted(self, async_client: AsyncClient):
|
|
|
|
|
+ """Valid claim names like preferred_username and upn must be accepted."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "ecv_adm5", "Ecv123!")
|
|
|
|
|
+ for claim in ("preferred_username", "upn", "email", "emailAddress"):
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": f"ECVTest-{claim[:8]}",
|
|
|
|
|
+ "issuer_url": f"https://ecv-{claim[:8]}.test",
|
|
|
|
|
+ "client_id": f"ecv-{claim[:8]}",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email",
|
|
|
|
|
+ "email_claim": claim,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 201, f"claim {claim!r} was rejected: {resp.text}"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ===========================================================================
|
|
|
|
|
+# TestOIDCEmailResolutionExtra — T1 / T3 / T4 additional coverage
|
|
|
|
|
+# ===========================================================================
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _create_provider_via_api(
|
|
|
|
|
+ async_client: AsyncClient,
|
|
|
|
|
+ admin_token: str,
|
|
|
|
|
+ issuer: str,
|
|
|
|
|
+ client_id: str,
|
|
|
|
|
+ *,
|
|
|
|
|
+ email_claim: str = "email",
|
|
|
|
|
+ require_email_verified: bool = True,
|
|
|
|
|
+ suffix: str = "",
|
|
|
|
|
+) -> int:
|
|
|
|
|
+ resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": f"TestIdP-extra-{suffix or secrets.token_hex(4)}",
|
|
|
|
|
+ "issuer_url": issuer,
|
|
|
|
|
+ "client_id": client_id,
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email profile",
|
|
|
|
|
+ "is_enabled": True,
|
|
|
|
|
+ "auto_create_users": True,
|
|
|
|
|
+ "email_claim": email_claim,
|
|
|
|
|
+ "require_email_verified": require_email_verified,
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert resp.status_code == 201, resp.text
|
|
|
|
|
+ return resp.json()["id"]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TestOIDCEmailResolutionExtra:
|
|
|
|
|
+ """T1: isinstance guard, T3: SEC-3 normalisation for Fall A/B, T4: inverse Combined-State-Guard."""
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_non_string_claim_value_drops_email(
|
|
|
|
|
+ self,
|
|
|
|
|
+ async_client: AsyncClient,
|
|
|
|
|
+ db_session: AsyncSession,
|
|
|
|
|
+ ):
|
|
|
|
|
+ """T1: A non-string email_claim value (list) must be silently dropped — no crash."""
|
|
|
|
|
+ private_pem, jwks_data = _make_test_rsa_key()
|
|
|
|
|
+ issuer = "https://nonstring-test.example"
|
|
|
|
|
+ client_id = "nonstring-client"
|
|
|
|
|
+
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "nonstr_adm", "Nonstr123!")
|
|
|
|
|
+ provider_id = await _create_provider_via_api(
|
|
|
|
|
+ async_client,
|
|
|
|
|
+ admin_token,
|
|
|
|
|
+ issuer,
|
|
|
|
|
+ client_id,
|
|
|
|
|
+ email_claim="preferred_username",
|
|
|
|
|
+ require_email_verified=False,
|
|
|
|
|
+ suffix="nonstr",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ from sqlalchemy import select
|
|
|
|
|
+
|
|
|
|
|
+ from backend.app.models.oidc_provider import UserOIDCLink
|
|
|
|
|
+
|
|
|
|
|
+ # IdP sends preferred_username as a list (non-string) — must not crash
|
|
|
|
|
+ location = await _run_oidc_callback(
|
|
|
|
|
+ async_client,
|
|
|
|
|
+ db_session,
|
|
|
|
|
+ provider_id=provider_id,
|
|
|
|
|
+ claims={"sub": "nonstr-sub-1", "preferred_username": ["user@example.com"]},
|
|
|
|
|
+ private_pem=private_pem,
|
|
|
|
|
+ jwks_data=jwks_data,
|
|
|
|
|
+ issuer=issuer,
|
|
|
|
|
+ client_id=client_id,
|
|
|
|
|
+ )
|
|
|
|
|
+ assert "internal_error" not in location, f"Unexpected error redirect: {location}"
|
|
|
|
|
+ link_result = await db_session.execute(
|
|
|
|
|
+ select(UserOIDCLink)
|
|
|
|
|
+ .where(UserOIDCLink.provider_id == provider_id)
|
|
|
|
|
+ .where(UserOIDCLink.provider_user_id == "nonstr-sub-1")
|
|
|
|
|
+ )
|
|
|
|
|
+ link = link_result.scalar_one_or_none()
|
|
|
|
|
+ assert link is not None
|
|
|
|
|
+ assert link.provider_email is None # list value dropped
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_fall_a_sec3_normalisation(
|
|
|
|
|
+ self,
|
|
|
|
|
+ async_client: AsyncClient,
|
|
|
|
|
+ db_session: AsyncSession,
|
|
|
|
|
+ ):
|
|
|
|
|
+ """T3: Fall A — uppercase + whitespace in email claim must be normalised to lowercase."""
|
|
|
|
|
+ private_pem, jwks_data = _make_test_rsa_key()
|
|
|
|
|
+ issuer = "https://sec3a-test.example"
|
|
|
|
|
+ client_id = "sec3a-client"
|
|
|
|
|
+
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "sec3a_adm", "Sec3a123!")
|
|
|
|
|
+ provider_id = await _create_provider_via_api(
|
|
|
|
|
+ async_client,
|
|
|
|
|
+ admin_token,
|
|
|
|
|
+ issuer,
|
|
|
|
|
+ client_id,
|
|
|
|
|
+ email_claim="email",
|
|
|
|
|
+ require_email_verified=True,
|
|
|
|
|
+ suffix="sec3a",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ from sqlalchemy import select
|
|
|
|
|
+
|
|
|
|
|
+ from backend.app.models.oidc_provider import UserOIDCLink
|
|
|
|
|
+
|
|
|
|
|
+ location = await _run_oidc_callback(
|
|
|
|
|
+ async_client,
|
|
|
|
|
+ db_session,
|
|
|
|
|
+ provider_id=provider_id,
|
|
|
|
|
+ claims={"sub": "sec3a-sub-1", "email": " USER@EXAMPLE.COM ", "email_verified": True},
|
|
|
|
|
+ private_pem=private_pem,
|
|
|
|
|
+ jwks_data=jwks_data,
|
|
|
|
|
+ issuer=issuer,
|
|
|
|
|
+ client_id=client_id,
|
|
|
|
|
+ )
|
|
|
|
|
+ assert "internal_error" not in location
|
|
|
|
|
+ link_result = await db_session.execute(
|
|
|
|
|
+ select(UserOIDCLink)
|
|
|
|
|
+ .where(UserOIDCLink.provider_id == provider_id)
|
|
|
|
|
+ .where(UserOIDCLink.provider_user_id == "sec3a-sub-1")
|
|
|
|
|
+ )
|
|
|
|
|
+ link = link_result.scalar_one_or_none()
|
|
|
|
|
+ assert link is not None
|
|
|
|
|
+ assert link.provider_email == "user@example.com"
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_fall_b_sec3_normalisation(
|
|
|
|
|
+ self,
|
|
|
|
|
+ async_client: AsyncClient,
|
|
|
|
|
+ db_session: AsyncSession,
|
|
|
|
|
+ ):
|
|
|
|
|
+ """T3: Fall B — uppercase + whitespace in email claim must be normalised to lowercase."""
|
|
|
|
|
+ private_pem, jwks_data = _make_test_rsa_key()
|
|
|
|
|
+ issuer = "https://sec3b-test.example"
|
|
|
|
|
+ client_id = "sec3b-client"
|
|
|
|
|
+
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "sec3b_adm", "Sec3b123!")
|
|
|
|
|
+ provider_id = await _create_provider_via_api(
|
|
|
|
|
+ async_client,
|
|
|
|
|
+ admin_token,
|
|
|
|
|
+ issuer,
|
|
|
|
|
+ client_id,
|
|
|
|
|
+ email_claim="email",
|
|
|
|
|
+ require_email_verified=False,
|
|
|
|
|
+ suffix="sec3b",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ from sqlalchemy import select
|
|
|
|
|
+
|
|
|
|
|
+ from backend.app.models.oidc_provider import UserOIDCLink
|
|
|
|
|
+
|
|
|
|
|
+ location = await _run_oidc_callback(
|
|
|
|
|
+ async_client,
|
|
|
|
|
+ db_session,
|
|
|
|
|
+ provider_id=provider_id,
|
|
|
|
|
+ claims={"sub": "sec3b-sub-1", "email": " USER@EXAMPLE.COM "},
|
|
|
|
|
+ private_pem=private_pem,
|
|
|
|
|
+ jwks_data=jwks_data,
|
|
|
|
|
+ issuer=issuer,
|
|
|
|
|
+ client_id=client_id,
|
|
|
|
|
+ )
|
|
|
|
|
+ assert "internal_error" not in location
|
|
|
|
|
+ link_result = await db_session.execute(
|
|
|
|
|
+ select(UserOIDCLink)
|
|
|
|
|
+ .where(UserOIDCLink.provider_id == provider_id)
|
|
|
|
|
+ .where(UserOIDCLink.provider_user_id == "sec3b-sub-1")
|
|
|
|
|
+ )
|
|
|
|
|
+ link = link_result.scalar_one_or_none()
|
|
|
|
|
+ assert link is not None
|
|
|
|
|
+ assert link.provider_email == "user@example.com"
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ @pytest.mark.integration
|
|
|
|
|
+ async def test_combined_state_guard_email_claim_inverse_order(self, async_client: AsyncClient):
|
|
|
|
|
+ """T4: Combined-State-Guard — set auto_link=True first, then switch email_claim to custom."""
|
|
|
|
|
+ admin_token = await _setup_and_login(async_client, "inv_ec_adm", "InvEc123!")
|
|
|
|
|
+ create_resp = await async_client.post(
|
|
|
|
|
+ "/api/v1/auth/oidc/providers",
|
|
|
|
|
+ json={
|
|
|
|
|
+ "name": "InvEcTest",
|
|
|
|
|
+ "issuer_url": "https://inv-ec.test",
|
|
|
|
|
+ "client_id": "inv-ec",
|
|
|
|
|
+ "client_secret": "sec",
|
|
|
|
|
+ "scopes": "openid email",
|
|
|
|
|
+ },
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert create_resp.status_code == 201
|
|
|
|
|
+ provider_id = create_resp.json()["id"]
|
|
|
|
|
+
|
|
|
|
|
+ # First: enable auto_link (safe — email_claim="email", require_ev=True)
|
|
|
|
|
+ upd1 = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"auto_link_existing_accounts": True},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd1.status_code == 200
|
|
|
|
|
+
|
|
|
|
|
+ # Second: switch email_claim to custom while auto_link is on → unsafe combined state
|
|
|
|
|
+ upd2 = await async_client.put(
|
|
|
|
|
+ f"/api/v1/auth/oidc/providers/{provider_id}",
|
|
|
|
|
+ json={"email_claim": "preferred_username"},
|
|
|
|
|
+ headers={"Authorization": f"Bearer {admin_token}"},
|
|
|
|
|
+ )
|
|
|
|
|
+ assert upd2.status_code == 422
|