| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561 |
- import re
- from typing import Literal
- from pydantic import BaseModel, Field, field_validator, model_validator
- def _validate_password_complexity(v: str) -> str:
- """Enforce minimum password complexity (M-C).
- Requires at least one uppercase letter, one lowercase letter, one digit,
- and one special character in addition to the min_length=8 Field constraint.
- """
- if not re.search(r"[A-Z]", v):
- raise ValueError("Password must contain at least one uppercase letter")
- if not re.search(r"[a-z]", v):
- raise ValueError("Password must contain at least one lowercase letter")
- if not re.search(r"\d", v):
- raise ValueError("Password must contain at least one digit")
- if not re.search(r"[^A-Za-z0-9]", v):
- raise ValueError("Password must contain at least one special character")
- return v
- class GroupBrief(BaseModel):
- """Brief group info for embedding in user responses."""
- id: int
- name: str
- class Config:
- from_attributes = True
- class LoginRequest(BaseModel):
- username: str = Field(..., max_length=150)
- password: str = Field(..., max_length=256)
- class LoginResponse(BaseModel):
- access_token: str | None = None
- token_type: str = "bearer"
- user: "UserResponse | None" = None
- # Set when 2FA is required; the frontend must call /auth/2fa/verify
- requires_2fa: bool = False
- pre_auth_token: str | None = None
- two_fa_methods: list[str] = []
- class UserCreate(BaseModel):
- username: str = Field(..., max_length=150)
- password: str | None = Field(default=None, max_length=256) # M-NEW-4: cap before pbkdf2
- email: str | None = Field(default=None, max_length=254) # L-NEW-5: RFC 5321 max
- role: str = "user"
- group_ids: list[int] | None = None
- @field_validator("password")
- @classmethod
- def validate_password(cls, v: str | None) -> str | None:
- if v is not None:
- _validate_password_complexity(v)
- return v
- class UserUpdate(BaseModel):
- username: str | None = Field(default=None, max_length=150)
- password: str | None = Field(default=None, max_length=256) # M-NEW-4: cap before pbkdf2
- email: str | None = Field(default=None, max_length=254) # L-NEW-5: RFC 5321 max
- role: str | None = None
- is_active: bool | None = None
- group_ids: list[int] | None = None
- @field_validator("password")
- @classmethod
- def validate_password(cls, v: str | None) -> str | None:
- if v is not None:
- _validate_password_complexity(v)
- return v
- class UserResponse(BaseModel):
- id: int
- username: str
- email: str | None = None
- role: str # Deprecated, kept for backward compatibility
- is_active: bool
- is_admin: bool # Computed from role and group membership
- auth_source: str = "local" # "local" or "ldap"
- groups: list[GroupBrief] = []
- permissions: list[str] = [] # All permissions from groups
- created_at: str
- class Config:
- from_attributes = True
- class LDAPSearchResultResponse(BaseModel):
- """One match from GET /auth/ldap/search — surfaced in the admin UI."""
- username: str
- email: str | None = None
- display_name: str | None = None
- dn: str
- already_provisioned: bool = False # True if this username already exists as a BamBuddy user
- class LDAPProvisionRequest(BaseModel):
- """Body for POST /auth/ldap/provision. Username is re-resolved via the
- service-account bind, so the request only carries the directory username
- the admin picked from the search results."""
- username: str = Field(..., max_length=150)
- class ChangePasswordRequest(BaseModel):
- current_password: str = Field(..., max_length=256) # M-NEW-3: cap before pbkdf2
- new_password: str = Field(..., min_length=8, max_length=256)
- @field_validator("new_password")
- @classmethod
- def validate_new_password(cls, v: str) -> str:
- return _validate_password_complexity(v)
- class SetupRequest(BaseModel):
- auth_enabled: bool
- admin_username: str | None = Field(default=None, max_length=150)
- admin_password: str | None = Field(default=None, max_length=256)
- # Password complexity is NOT validated at the schema layer. When re-enabling auth
- # with an existing admin user (or when LDAP is the auth backend), the frontend
- # still sends whatever is in the password field but the route ignores it.
- # Enforcing complexity here would reject those legitimate flows. The route body
- # applies the check only when a brand-new local admin is actually being created.
- class SetupResponse(BaseModel):
- auth_enabled: bool
- admin_created: bool | None = None
- class ForgotPasswordRequest(BaseModel):
- email: str = Field(..., max_length=254) # L-NEW-1: RFC 5321 max; caps memory/CPU before lookup
- class ForgotPasswordConfirmRequest(BaseModel):
- token: str = Field(..., max_length=128)
- new_password: str = Field(..., min_length=8, max_length=256)
- @field_validator("new_password")
- @classmethod
- def validate_new_password(cls, v: str) -> str:
- return _validate_password_complexity(v)
- class ForgotPasswordResponse(BaseModel):
- message: str
- class ResetPasswordRequest(BaseModel):
- user_id: int
- class ResetPasswordResponse(BaseModel):
- message: str
- class SMTPSettings(BaseModel):
- smtp_host: str
- smtp_port: int
- smtp_username: str | None = None # Optional when auth is disabled
- smtp_password: str | None = None # Optional for read operations or when auth is disabled
- smtp_security: str = "starttls" # 'starttls', 'ssl', 'none'
- smtp_auth_enabled: bool = True
- smtp_from_email: str
- smtp_from_name: str = "BamBuddy"
- # Deprecated field for backward compatibility
- smtp_use_tls: bool | None = None
- class TestSMTPRequest(BaseModel):
- test_recipient: str
- class TestSMTPResponse(BaseModel):
- success: bool
- message: str
- # ---------------------------------------------------------------------------
- # 2FA / MFA schemas
- # ---------------------------------------------------------------------------
- class TwoFAStatusResponse(BaseModel):
- totp_enabled: bool
- email_otp_enabled: bool
- backup_codes_remaining: int
- class TOTPSetupResponse(BaseModel):
- """Returned when a user initiates TOTP setup. The frontend should display
- the QR code image (base64 PNG) and ask the user to scan it, then call
- /auth/2fa/totp/enable with a valid code to confirm."""
- secret: str # base32 secret (shown as fallback text)
- qr_code_b64: str # base64-encoded PNG of the QR code
- issuer: str
- class TOTPSetupRequest(BaseModel):
- """Optional body for POST /auth/2fa/totp/setup.
- Only required when re-initialising setup while an active TOTP record exists.
- Provide the current TOTP code (from the existing authenticator app) to
- confirm intent — mirrors the verification requirement in disable_totp.
- """
- code: str | None = Field(default=None, max_length=8) # L-NEW-2: bound before pyotp
- class TOTPEnableRequest(BaseModel):
- code: str # 6-digit TOTP code from the authenticator app
- @field_validator("code")
- @classmethod
- def validate_code(cls, v: str) -> str:
- v = v.strip()
- if not v.isdigit() or len(v) != 6:
- raise ValueError("TOTP code must be exactly 6 digits")
- return v
- class TOTPEnableResponse(BaseModel):
- message: str
- backup_codes: list[str] # plain-text codes shown once; user must save them
- class TOTPDisableRequest(BaseModel):
- """Requires a valid TOTP code OR a backup code to disable TOTP."""
- code: str = Field(..., max_length=128)
- class BackupCodesResponse(BaseModel):
- backup_codes: list[str]
- message: str
- class EmailOTPEnableRequest(BaseModel):
- """No body required — email is taken from the authenticated user's profile."""
- pass
- class TwoFAVerifyRequest(BaseModel):
- pre_auth_token: str = Field(..., max_length=128)
- # TOTP/email codes are 6 digits; backup codes are 8 uppercase alphanumeric chars.
- # max_length=8 prevents excessively long inputs from reaching pbkdf2/pyotp.
- code: str = Field(..., min_length=6, max_length=8)
- method: Literal["totp", "email", "backup"] = "totp"
- @field_validator("code")
- @classmethod
- def validate_code_format(cls, v: str) -> str:
- v = v.strip()
- if not re.match(r"^[A-Za-z0-9]{6,8}$", v):
- raise ValueError("Code must be 6–8 alphanumeric characters")
- return v.upper() # normalise backup codes to uppercase
- class TwoFAVerifyResponse(BaseModel):
- access_token: str
- token_type: str = "bearer"
- user: "UserResponse"
- class EmailOTPSendRequest(BaseModel):
- pre_auth_token: str = Field(..., max_length=128)
- class EmailOTPEnableConfirmRequest(BaseModel):
- """Body for the second step of email OTP enable: verify the proof-of-possession code."""
- setup_token: str = Field(..., max_length=128)
- # L-NEW-3: email OTP setup codes are always exactly 6 digits; reject anything else.
- code: str = Field(..., min_length=6, max_length=6)
- @field_validator("code")
- @classmethod
- def validate_code_digits(cls, v: str) -> str:
- v = v.strip()
- if not v.isdigit() or len(v) != 6:
- raise ValueError("Email OTP setup code must be exactly 6 digits")
- return v
- class EmailOTPDisableRequest(BaseModel):
- """Requires the account password to disable email OTP."""
- password: str = Field(..., max_length=256)
- class AdminDisable2FARequest(BaseModel):
- """Admin must supply their own password as re-auth before disabling 2FA for another user.
- OIDC/LDAP-only admins (no local password_hash) are exempt from this check.
- """
- admin_password: str | None = Field(default=None, max_length=256)
- # ---------------------------------------------------------------------------
- # OIDC schemas
- # ---------------------------------------------------------------------------
- AUTO_LINK_REQUIREMENTS_ERROR = (
- "auto_link_existing_accounts requires require_email_verified=True when email_claim='email'"
- )
- def _validate_email_claim_name(v: str) -> str:
- # Accepts only alphanumeric/underscore/hyphen claim names starting with a letter —
- # prevents log injection and limits the attack surface of operator-supplied claim names.
- if not re.fullmatch(r"[a-zA-Z][a-zA-Z0-9_\-]{0,63}", v):
- raise ValueError("Invalid claim name")
- return v
- def _validate_icon_url(v: str | None) -> str | None:
- """Reject non-HTTPS icon URLs and SSRF-unsafe hosts.
- Delegates to the runtime SSRF guard ``assert_safe_public_https_url``
- so the Pydantic layer enforces the same allowlist as the fetcher —
- no policy drift between schema validation and SSRF check. Without
- this delegation the validator covered only ``is_private | is_loopback
- | is_link_local`` while the runtime additionally rejected numeric-
- encoded IPs, cloud-metadata endpoints, multicast, unspecified, and
- IPv4-mapped IPv6.
- Lazy-imported because ``_oidc_helpers`` lives under ``api/routes/``
- and schemas avoid top-level imports from that layer (matches the
- existing pattern in ``_validate_issuer_url`` which lazy-imports
- ``ipaddress``).
- """
- if v is None:
- return v
- if not v.startswith("https://"):
- # Surface the same wording the runtime guard would use, but pre-
- # checked here so the user-facing error doesn't depend on the
- # runtime call path.
- raise ValueError("icon_url must start with https://")
- from backend.app.api.routes._oidc_helpers import assert_safe_public_https_url
- try:
- assert_safe_public_https_url(v)
- except ValueError as exc:
- raise ValueError(f"icon_url: {exc}") from exc
- return v
- def _validate_issuer_url(v: str | None) -> str | None:
- """Nit4: Reject non-HTTPS issuer URLs and private/loopback/link-local hosts.
- HTTP is no longer accepted — OIDC providers must be reachable over TLS.
- Private-network and loopback addresses are rejected to prevent SSRF attacks
- where an admin-supplied URL could reach internal services.
- """
- import ipaddress
- from urllib.parse import urlparse
- if v is None:
- return v
- if not v.startswith("https://"):
- raise ValueError("issuer_url must start with https://")
- host = urlparse(v).hostname or ""
- try:
- addr = ipaddress.ip_address(host)
- if addr.is_private or addr.is_loopback or addr.is_link_local:
- raise ValueError("issuer_url must not point to a private, loopback, or link-local address")
- except ValueError as exc:
- if "issuer_url" in str(exc):
- raise
- # hostname is a domain name, not a bare IP — that's fine
- return v
- def _validate_scopes(v: str | None) -> str | None:
- """Nit5: Require that the 'openid' scope is present.
- The OpenID Connect spec mandates the 'openid' scope; without it the
- response is plain OAuth2, not OIDC, and claims like sub/email are not
- guaranteed.
- """
- if v is None:
- return v
- scope_list = v.split()
- if "openid" not in scope_list:
- raise ValueError("scopes must include 'openid'")
- return v
- class OIDCProviderCreate(BaseModel):
- name: str = Field(..., max_length=100) # L-NEW-4
- issuer_url: str
- client_id: str = Field(..., max_length=256) # L-NEW-4
- client_secret: str = Field(..., max_length=512) # L-NEW-4: Fernet input bounded
- scopes: str = Field(default="openid email profile", max_length=256) # L-NEW-4
- is_enabled: bool = True
- auto_create_users: bool = False
- auto_link_existing_accounts: bool = False # M-2: conservative default, opt-in only
- email_claim: str = Field(default="email", max_length=64)
- require_email_verified: bool = True
- icon_url: str | None = None
- default_group_id: int | None = None
- @field_validator("issuer_url")
- @classmethod
- def validate_issuer_url(cls, v: str) -> str:
- result = _validate_issuer_url(v)
- if result is None:
- raise ValueError("issuer_url is required")
- return result
- @field_validator("scopes")
- @classmethod
- def validate_scopes(cls, v: str) -> str:
- result = _validate_scopes(v)
- if result is None:
- raise ValueError("scopes is required")
- return result
- @field_validator("email_claim")
- @classmethod
- def validate_email_claim(cls, v: str) -> str:
- return _validate_email_claim_name(v)
- @field_validator("icon_url")
- @classmethod
- def validate_icon_url(cls, v: str | None) -> str | None:
- return _validate_icon_url(v)
- # SEC-1: auto_link with email_claim='email' requires require_email_verified=True.
- # Fall B (require_email_verified=False + email_claim='email') accepts absent email_verified → account-takeover risk.
- # Fall C (custom claim != 'email') is safe: no email_verified gate on that path regardless of require_email_verified.
- @model_validator(mode="after")
- def check_auto_link_requires_verified(self) -> "OIDCProviderCreate":
- if self.auto_link_existing_accounts and self.email_claim == "email" and not self.require_email_verified:
- raise ValueError(AUTO_LINK_REQUIREMENTS_ERROR)
- return self
- class OIDCProviderUpdate(BaseModel):
- name: str | None = Field(default=None, max_length=100)
- issuer_url: str | None = None
- @field_validator("issuer_url")
- @classmethod
- def validate_issuer_url(cls, v: str | None) -> str | None:
- return _validate_issuer_url(v)
- client_id: str | None = Field(default=None, max_length=256)
- client_secret: str | None = Field(default=None, max_length=512)
- scopes: str | None = Field(default=None, max_length=256)
- is_enabled: bool | None = None
- auto_create_users: bool | None = None
- auto_link_existing_accounts: bool | None = None
- email_claim: str | None = Field(default=None, max_length=64)
- require_email_verified: bool | None = None
- icon_url: str | None = None
- default_group_id: int | None = None
- @field_validator("scopes")
- @classmethod
- def validate_scopes(cls, v: str | None) -> str | None:
- return _validate_scopes(v)
- @field_validator("email_claim")
- @classmethod
- def validate_email_claim(cls, v: str | None) -> str | None:
- if v is None:
- return None
- return _validate_email_claim_name(v)
- @field_validator("icon_url")
- @classmethod
- def validate_icon_url(cls, v: str | None) -> str | None:
- return _validate_icon_url(v)
- # SEC-1 (schema-level): blocks only when auto_link=True + email_claim='email' + require_email_verified=False
- # arrive in the same request. email_claim=None means the request leaves it unchanged (still 'email' by default),
- # so that is also treated as 'email'. Partial updates spanning two requests are caught by the
- # Combined-State-Guard in the route handler after the setattr loop.
- @model_validator(mode="after")
- def check_auto_link_requires_verified(self) -> "OIDCProviderUpdate":
- if (
- self.auto_link_existing_accounts is True
- and self.require_email_verified is False
- and (self.email_claim is None or self.email_claim == "email")
- ):
- raise ValueError(AUTO_LINK_REQUIREMENTS_ERROR)
- return self
- class OIDCProviderResponse(BaseModel):
- id: int
- name: str
- issuer_url: str
- client_id: str
- scopes: str
- is_enabled: bool
- auto_create_users: bool
- auto_link_existing_accounts: bool = False
- email_claim: str = "email"
- require_email_verified: bool = True
- icon_url: str | None = None
- default_group_id: int | None = None
- # Set explicitly in the route handler from `icon_content_type is not None`
- # rather than `@computed_field` (project policy) or `icon_data is not None`
- # (would trigger an async lazy-load on the deferred BLOB column).
- # Required (no default) so Pydantic fails loudly if any code path skips
- # `_build_provider_response` and tries `model_validate(provider)` directly.
- has_icon: bool
- class Config:
- from_attributes = True
- class OIDCAuthorizeResponse(BaseModel):
- auth_url: str
- class OIDCExchangeRequest(BaseModel):
- oidc_token: str = Field(..., max_length=128)
- class OIDCLinkResponse(BaseModel):
- id: int
- provider_id: int
- provider_name: str
- provider_email: str | None = None
- created_at: str
- class EncryptionRowCounts(BaseModel):
- oidc_providers: int
- user_totp: int
- class EncryptionStatusResponse(BaseModel):
- key_configured: bool
- key_source: Literal["env", "file", "generated", "none"]
- legacy_plaintext_rows: EncryptionRowCounts
- encrypted_rows: EncryptionRowCounts
- # B4: filled by the endpoint after a sample-decrypt of one encrypted row,
- # so a wrong-key state (where key_configured=True but rows decrypt to junk)
- # is detected, not just the no-key case.
- decryption_broken: bool = False
- # B2: number of rows skipped during the last legacy re-encryption migration.
- # Filled from backend.app.core.database.get_migration_error_count().
- migration_error_count: int = 0
|