oidc_provider.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from __future__ import annotations
  2. from datetime import datetime
  3. from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint, func
  4. from sqlalchemy.orm import Mapped, mapped_column, relationship
  5. from backend.app.core.database import Base
  6. from backend.app.core.encryption import mfa_decrypt, mfa_encrypt
  7. class OIDCProvider(Base):
  8. """OpenID Connect provider configuration.
  9. Supports any standards-compliant OIDC provider such as PocketID,
  10. Authentik, Keycloak, Authelia, Google, etc.
  11. The issuer_url must point to the root issuer (e.g. ``https://id.example.com``).
  12. The OIDC discovery document is fetched from
  13. ``{issuer_url}/.well-known/openid-configuration`` at runtime.
  14. """
  15. __tablename__ = "oidc_providers"
  16. id: Mapped[int] = mapped_column(primary_key=True)
  17. # Human-readable name shown on the login button (e.g. "PocketID", "Google")
  18. name: Mapped[str] = mapped_column(String(100), unique=True)
  19. # Full OIDC issuer URL (e.g. "https://id.example.com")
  20. issuer_url: Mapped[str] = mapped_column(String(500))
  21. client_id: Mapped[str] = mapped_column(String(255))
  22. # Encrypted at rest when MFA_ENCRYPTION_KEY is set.
  23. # Use .client_secret / .client_secret setter rather than _client_secret_enc directly.
  24. _client_secret_enc: Mapped[str] = mapped_column("client_secret", String(512))
  25. @property
  26. def client_secret(self) -> str:
  27. return mfa_decrypt(self._client_secret_enc)
  28. @client_secret.setter
  29. def client_secret(self, value: str) -> None:
  30. self._client_secret_enc = mfa_encrypt(value)
  31. # Space-separated scopes; must include "openid"
  32. scopes: Mapped[str] = mapped_column(String(500), default="openid email profile")
  33. is_enabled: Mapped[bool] = mapped_column(Boolean, default=True)
  34. # When True, a new local user is created automatically on first OIDC login
  35. auto_create_users: Mapped[bool] = mapped_column(Boolean, default=False)
  36. # When True, an existing local user whose email matches the OIDC claim is
  37. # automatically linked on first SSO login. Default is False (conservative):
  38. # operators must explicitly opt-in to prevent an attacker-controlled IdP from
  39. # silently hijacking local accounts via email matching (M-2 fix).
  40. auto_link_existing_accounts: Mapped[bool] = mapped_column(Boolean, default=False)
  41. # Optional icon URL (SVG/PNG) shown on the login button
  42. icon_url: Mapped[str | None] = mapped_column(Text, nullable=True, default=None)
  43. created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
  44. updated_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
  45. # Relationship to linked user accounts
  46. user_links: Mapped[list[UserOIDCLink]] = relationship(
  47. "UserOIDCLink",
  48. back_populates="provider",
  49. cascade="all, delete-orphan",
  50. )
  51. def __repr__(self) -> str:
  52. return f"<OIDCProvider {self.name!r}>"
  53. class UserOIDCLink(Base):
  54. """Links a local Bambuddy user account to an identity at an OIDC provider."""
  55. __tablename__ = "user_oidc_links"
  56. __table_args__ = (
  57. # T2: Prevent duplicate OIDC identities and duplicate provider links.
  58. # (provider_id, provider_user_id) — one OIDC sub per provider maps to at most one local user.
  59. UniqueConstraint("provider_id", "provider_user_id", name="uq_oidc_link_provider_sub"),
  60. # (user_id, provider_id) — one local user can link to each provider at most once.
  61. UniqueConstraint("user_id", "provider_id", name="uq_oidc_link_user_provider"),
  62. )
  63. id: Mapped[int] = mapped_column(primary_key=True)
  64. user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id", ondelete="CASCADE"), index=True)
  65. provider_id: Mapped[int] = mapped_column(Integer, ForeignKey("oidc_providers.id", ondelete="CASCADE"), index=True)
  66. # The "sub" claim from the OIDC ID token — stable identifier for the user
  67. provider_user_id: Mapped[str] = mapped_column(String(500))
  68. # Email returned by the provider (informational; may differ from local email)
  69. provider_email: Mapped[str | None] = mapped_column(String(255), nullable=True, default=None)
  70. created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
  71. provider: Mapped[OIDCProvider] = relationship("OIDCProvider", back_populates="user_links")
  72. def __repr__(self) -> str:
  73. return f"<UserOIDCLink user_id={self.user_id} provider_id={self.provider_id}>"