| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- from __future__ import annotations
- from datetime import datetime
- from sqlalchemy import Boolean, CheckConstraint, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint, func
- from sqlalchemy.orm import Mapped, mapped_column, relationship
- from backend.app.core.database import Base
- from backend.app.core.encryption import mfa_decrypt, mfa_encrypt
- class OIDCProvider(Base):
- """OpenID Connect provider configuration.
- Supports any standards-compliant OIDC provider such as PocketID,
- Authentik, Keycloak, Authelia, Google, etc.
- The issuer_url must point to the root issuer (e.g. ``https://id.example.com``).
- The OIDC discovery document is fetched from
- ``{issuer_url}/.well-known/openid-configuration`` at runtime.
- """
- __tablename__ = "oidc_providers"
- __table_args__ = (
- # DB-level enforcement of SEC-1: blocks only Fall B (email_claim='email' + require_ev=False).
- # Fall C (custom claim) is safe — no email_verified gate on that path.
- # Enforced on new installations; existing tables updated via _migrate_update_auto_link_constraint.
- CheckConstraint(
- "auto_link_existing_accounts = FALSE OR email_claim != 'email' OR require_email_verified = TRUE",
- name="ck_auto_link_requires_verified_email_claim",
- ),
- )
- id: Mapped[int] = mapped_column(primary_key=True)
- # Human-readable name shown on the login button (e.g. "PocketID", "Google")
- name: Mapped[str] = mapped_column(String(100), unique=True)
- # Full OIDC issuer URL (e.g. "https://id.example.com")
- issuer_url: Mapped[str] = mapped_column(String(500))
- client_id: Mapped[str] = mapped_column(String(255))
- # Encrypted at rest when MFA_ENCRYPTION_KEY is set.
- # Use .client_secret / .client_secret setter rather than _client_secret_enc directly.
- _client_secret_enc: Mapped[str] = mapped_column("client_secret", String(512))
- @property
- def client_secret(self) -> str:
- return mfa_decrypt(self._client_secret_enc)
- @client_secret.setter
- def client_secret(self, value: str) -> None:
- self._client_secret_enc = mfa_encrypt(value)
- # Space-separated scopes; must include "openid"
- scopes: Mapped[str] = mapped_column(String(500), default="openid email profile")
- is_enabled: Mapped[bool] = mapped_column(Boolean, default=True)
- # When True, a new local user is created automatically on first OIDC login
- auto_create_users: Mapped[bool] = mapped_column(Boolean, default=False)
- # When True, an existing local user whose email matches the OIDC claim is
- # automatically linked on first SSO login. Default is False (conservative):
- # operators must explicitly opt-in to prevent an attacker-controlled IdP from
- # silently hijacking local accounts via email matching (M-2 fix).
- auto_link_existing_accounts: Mapped[bool] = mapped_column(Boolean, default=False)
- # JWT claim name used as the email identity (default "email").
- # Set to "preferred_username" or "upn" for Azure Entra ID, which does not send
- # email_verified — using a custom claim skips the email_verified check entirely
- # and is the recommended Azure configuration.
- # Has no interaction with require_email_verified when set to a non-"email" value:
- # custom claims never perform an email_verified check regardless of that setting.
- email_claim: Mapped[str] = mapped_column(String(64), default="email")
- # When True (default), the "email" claim is only trusted when email_verified=True.
- # Set to False to accept the email even when email_verified is absent — required
- # for providers like Azure Entra ID that never send email_verified and where a
- # custom claim (email_claim != "email") is not preferred.
- # Has no effect when email_claim is not "email": the custom-claim path never
- # performs an email_verified check regardless of this setting.
- require_email_verified: Mapped[bool] = mapped_column(Boolean, default=True)
- # Optional icon URL (SVG/PNG) shown on the login button
- icon_url: Mapped[str | None] = mapped_column(Text, nullable=True, default=None)
- created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
- updated_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
- # Relationship to linked user accounts
- user_links: Mapped[list[UserOIDCLink]] = relationship(
- "UserOIDCLink",
- back_populates="provider",
- cascade="all, delete-orphan",
- )
- def __repr__(self) -> str:
- return f"<OIDCProvider {self.name!r}>"
- class UserOIDCLink(Base):
- """Links a local Bambuddy user account to an identity at an OIDC provider."""
- __tablename__ = "user_oidc_links"
- __table_args__ = (
- # T2: Prevent duplicate OIDC identities and duplicate provider links.
- # (provider_id, provider_user_id) — one OIDC sub per provider maps to at most one local user.
- UniqueConstraint("provider_id", "provider_user_id", name="uq_oidc_link_provider_sub"),
- # (user_id, provider_id) — one local user can link to each provider at most once.
- UniqueConstraint("user_id", "provider_id", name="uq_oidc_link_user_provider"),
- )
- id: Mapped[int] = mapped_column(primary_key=True)
- user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id", ondelete="CASCADE"), index=True)
- provider_id: Mapped[int] = mapped_column(Integer, ForeignKey("oidc_providers.id", ondelete="CASCADE"), index=True)
- # The "sub" claim from the OIDC ID token — stable identifier for the user
- provider_user_id: Mapped[str] = mapped_column(String(500))
- # Email returned by the provider (informational; may differ from local email)
- provider_email: Mapped[str | None] = mapped_column(String(255), nullable=True, default=None)
- created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
- provider: Mapped[OIDCProvider] = relationship("OIDCProvider", back_populates="user_links")
- def __repr__(self) -> str:
- return f"<UserOIDCLink user_id={self.user_id} provider_id={self.provider_id}>"
|