auth.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. import re
  2. from typing import Literal
  3. from pydantic import BaseModel, Field, field_validator, model_validator
  4. def _validate_password_complexity(v: str) -> str:
  5. """Enforce minimum password complexity (M-C).
  6. Requires at least one uppercase letter, one lowercase letter, one digit,
  7. and one special character in addition to the min_length=8 Field constraint.
  8. """
  9. if not re.search(r"[A-Z]", v):
  10. raise ValueError("Password must contain at least one uppercase letter")
  11. if not re.search(r"[a-z]", v):
  12. raise ValueError("Password must contain at least one lowercase letter")
  13. if not re.search(r"\d", v):
  14. raise ValueError("Password must contain at least one digit")
  15. if not re.search(r"[^A-Za-z0-9]", v):
  16. raise ValueError("Password must contain at least one special character")
  17. return v
  18. class GroupBrief(BaseModel):
  19. """Brief group info for embedding in user responses."""
  20. id: int
  21. name: str
  22. class Config:
  23. from_attributes = True
  24. class LoginRequest(BaseModel):
  25. username: str = Field(..., max_length=150)
  26. password: str = Field(..., max_length=256)
  27. class LoginResponse(BaseModel):
  28. access_token: str | None = None
  29. token_type: str = "bearer"
  30. user: "UserResponse | None" = None
  31. # Set when 2FA is required; the frontend must call /auth/2fa/verify
  32. requires_2fa: bool = False
  33. pre_auth_token: str | None = None
  34. two_fa_methods: list[str] = []
  35. class UserCreate(BaseModel):
  36. username: str = Field(..., max_length=150)
  37. password: str | None = Field(default=None, max_length=256) # M-NEW-4: cap before pbkdf2
  38. email: str | None = Field(default=None, max_length=254) # L-NEW-5: RFC 5321 max
  39. role: str = "user"
  40. group_ids: list[int] | None = None
  41. @field_validator("password")
  42. @classmethod
  43. def validate_password(cls, v: str | None) -> str | None:
  44. if v is not None:
  45. _validate_password_complexity(v)
  46. return v
  47. class UserUpdate(BaseModel):
  48. username: str | None = Field(default=None, max_length=150)
  49. password: str | None = Field(default=None, max_length=256) # M-NEW-4: cap before pbkdf2
  50. email: str | None = Field(default=None, max_length=254) # L-NEW-5: RFC 5321 max
  51. role: str | None = None
  52. is_active: bool | None = None
  53. group_ids: list[int] | None = None
  54. @field_validator("password")
  55. @classmethod
  56. def validate_password(cls, v: str | None) -> str | None:
  57. if v is not None:
  58. _validate_password_complexity(v)
  59. return v
  60. class UserResponse(BaseModel):
  61. id: int
  62. username: str
  63. email: str | None = None
  64. role: str # Deprecated, kept for backward compatibility
  65. is_active: bool
  66. is_admin: bool # Computed from role and group membership
  67. auth_source: str = "local" # "local" or "ldap"
  68. groups: list[GroupBrief] = []
  69. permissions: list[str] = [] # All permissions from groups
  70. created_at: str
  71. class Config:
  72. from_attributes = True
  73. class LDAPSearchResultResponse(BaseModel):
  74. """One match from GET /auth/ldap/search — surfaced in the admin UI."""
  75. username: str
  76. email: str | None = None
  77. display_name: str | None = None
  78. dn: str
  79. already_provisioned: bool = False # True if this username already exists as a BamBuddy user
  80. class LDAPProvisionRequest(BaseModel):
  81. """Body for POST /auth/ldap/provision. Username is re-resolved via the
  82. service-account bind, so the request only carries the directory username
  83. the admin picked from the search results."""
  84. username: str = Field(..., max_length=150)
  85. class ChangePasswordRequest(BaseModel):
  86. current_password: str = Field(..., max_length=256) # M-NEW-3: cap before pbkdf2
  87. new_password: str = Field(..., min_length=8, max_length=256)
  88. @field_validator("new_password")
  89. @classmethod
  90. def validate_new_password(cls, v: str) -> str:
  91. return _validate_password_complexity(v)
  92. class SetupRequest(BaseModel):
  93. auth_enabled: bool
  94. admin_username: str | None = Field(default=None, max_length=150)
  95. admin_password: str | None = Field(default=None, max_length=256)
  96. # Password complexity is NOT validated at the schema layer. When re-enabling auth
  97. # with an existing admin user (or when LDAP is the auth backend), the frontend
  98. # still sends whatever is in the password field but the route ignores it.
  99. # Enforcing complexity here would reject those legitimate flows. The route body
  100. # applies the check only when a brand-new local admin is actually being created.
  101. class SetupResponse(BaseModel):
  102. auth_enabled: bool
  103. admin_created: bool | None = None
  104. class ForgotPasswordRequest(BaseModel):
  105. email: str = Field(..., max_length=254) # L-NEW-1: RFC 5321 max; caps memory/CPU before lookup
  106. class ForgotPasswordConfirmRequest(BaseModel):
  107. token: str = Field(..., max_length=128)
  108. new_password: str = Field(..., min_length=8, max_length=256)
  109. @field_validator("new_password")
  110. @classmethod
  111. def validate_new_password(cls, v: str) -> str:
  112. return _validate_password_complexity(v)
  113. class ForgotPasswordResponse(BaseModel):
  114. message: str
  115. class ResetPasswordRequest(BaseModel):
  116. user_id: int
  117. class ResetPasswordResponse(BaseModel):
  118. message: str
  119. class SMTPSettings(BaseModel):
  120. smtp_host: str
  121. smtp_port: int
  122. smtp_username: str | None = None # Optional when auth is disabled
  123. smtp_password: str | None = None # Optional for read operations or when auth is disabled
  124. smtp_security: str = "starttls" # 'starttls', 'ssl', 'none'
  125. smtp_auth_enabled: bool = True
  126. smtp_from_email: str
  127. smtp_from_name: str = "BamBuddy"
  128. # Deprecated field for backward compatibility
  129. smtp_use_tls: bool | None = None
  130. class TestSMTPRequest(BaseModel):
  131. test_recipient: str
  132. class TestSMTPResponse(BaseModel):
  133. success: bool
  134. message: str
  135. # ---------------------------------------------------------------------------
  136. # 2FA / MFA schemas
  137. # ---------------------------------------------------------------------------
  138. class TwoFAStatusResponse(BaseModel):
  139. totp_enabled: bool
  140. email_otp_enabled: bool
  141. backup_codes_remaining: int
  142. class TOTPSetupResponse(BaseModel):
  143. """Returned when a user initiates TOTP setup. The frontend should display
  144. the QR code image (base64 PNG) and ask the user to scan it, then call
  145. /auth/2fa/totp/enable with a valid code to confirm."""
  146. secret: str # base32 secret (shown as fallback text)
  147. qr_code_b64: str # base64-encoded PNG of the QR code
  148. issuer: str
  149. class TOTPSetupRequest(BaseModel):
  150. """Optional body for POST /auth/2fa/totp/setup.
  151. Only required when re-initialising setup while an active TOTP record exists.
  152. Provide the current TOTP code (from the existing authenticator app) to
  153. confirm intent — mirrors the verification requirement in disable_totp.
  154. """
  155. code: str | None = Field(default=None, max_length=8) # L-NEW-2: bound before pyotp
  156. class TOTPEnableRequest(BaseModel):
  157. code: str # 6-digit TOTP code from the authenticator app
  158. @field_validator("code")
  159. @classmethod
  160. def validate_code(cls, v: str) -> str:
  161. v = v.strip()
  162. if not v.isdigit() or len(v) != 6:
  163. raise ValueError("TOTP code must be exactly 6 digits")
  164. return v
  165. class TOTPEnableResponse(BaseModel):
  166. message: str
  167. backup_codes: list[str] # plain-text codes shown once; user must save them
  168. class TOTPDisableRequest(BaseModel):
  169. """Requires a valid TOTP code OR a backup code to disable TOTP."""
  170. code: str = Field(..., max_length=128)
  171. class BackupCodesResponse(BaseModel):
  172. backup_codes: list[str]
  173. message: str
  174. class EmailOTPEnableRequest(BaseModel):
  175. """No body required — email is taken from the authenticated user's profile."""
  176. pass
  177. class TwoFAVerifyRequest(BaseModel):
  178. pre_auth_token: str = Field(..., max_length=128)
  179. # TOTP/email codes are 6 digits; backup codes are 8 uppercase alphanumeric chars.
  180. # max_length=8 prevents excessively long inputs from reaching pbkdf2/pyotp.
  181. code: str = Field(..., min_length=6, max_length=8)
  182. method: Literal["totp", "email", "backup"] = "totp"
  183. @field_validator("code")
  184. @classmethod
  185. def validate_code_format(cls, v: str) -> str:
  186. v = v.strip()
  187. if not re.match(r"^[A-Za-z0-9]{6,8}$", v):
  188. raise ValueError("Code must be 6–8 alphanumeric characters")
  189. return v.upper() # normalise backup codes to uppercase
  190. class TwoFAVerifyResponse(BaseModel):
  191. access_token: str
  192. token_type: str = "bearer"
  193. user: "UserResponse"
  194. class EmailOTPSendRequest(BaseModel):
  195. pre_auth_token: str = Field(..., max_length=128)
  196. class EmailOTPEnableConfirmRequest(BaseModel):
  197. """Body for the second step of email OTP enable: verify the proof-of-possession code."""
  198. setup_token: str = Field(..., max_length=128)
  199. # L-NEW-3: email OTP setup codes are always exactly 6 digits; reject anything else.
  200. code: str = Field(..., min_length=6, max_length=6)
  201. @field_validator("code")
  202. @classmethod
  203. def validate_code_digits(cls, v: str) -> str:
  204. v = v.strip()
  205. if not v.isdigit() or len(v) != 6:
  206. raise ValueError("Email OTP setup code must be exactly 6 digits")
  207. return v
  208. class EmailOTPDisableRequest(BaseModel):
  209. """Requires the account password to disable email OTP."""
  210. password: str = Field(..., max_length=256)
  211. class AdminDisable2FARequest(BaseModel):
  212. """Admin must supply their own password as re-auth before disabling 2FA for another user.
  213. OIDC/LDAP-only admins (no local password_hash) are exempt from this check.
  214. """
  215. admin_password: str | None = Field(default=None, max_length=256)
  216. # ---------------------------------------------------------------------------
  217. # OIDC schemas
  218. # ---------------------------------------------------------------------------
  219. AUTO_LINK_REQUIREMENTS_ERROR = (
  220. "auto_link_existing_accounts requires require_email_verified=True when email_claim='email'"
  221. )
  222. def _validate_email_claim_name(v: str) -> str:
  223. # Accepts only alphanumeric/underscore/hyphen claim names starting with a letter —
  224. # prevents log injection and limits the attack surface of operator-supplied claim names.
  225. if not re.fullmatch(r"[a-zA-Z][a-zA-Z0-9_\-]{0,63}", v):
  226. raise ValueError("Invalid claim name")
  227. return v
  228. def _validate_icon_url(v: str | None) -> str | None:
  229. """Reject non-HTTPS icon URLs and SSRF-unsafe hosts.
  230. Delegates to the runtime SSRF guard ``assert_safe_public_https_url``
  231. so the Pydantic layer enforces the same allowlist as the fetcher —
  232. no policy drift between schema validation and SSRF check. Without
  233. this delegation the validator covered only ``is_private | is_loopback
  234. | is_link_local`` while the runtime additionally rejected numeric-
  235. encoded IPs, cloud-metadata endpoints, multicast, unspecified, and
  236. IPv4-mapped IPv6.
  237. Lazy-imported because ``_oidc_helpers`` lives under ``api/routes/``
  238. and schemas avoid top-level imports from that layer (matches the
  239. existing pattern in ``_validate_issuer_url`` which lazy-imports
  240. ``ipaddress``).
  241. """
  242. if v is None:
  243. return v
  244. if not v.startswith("https://"):
  245. # Surface the same wording the runtime guard would use, but pre-
  246. # checked here so the user-facing error doesn't depend on the
  247. # runtime call path.
  248. raise ValueError("icon_url must start with https://")
  249. from backend.app.api.routes._oidc_helpers import assert_safe_public_https_url
  250. try:
  251. assert_safe_public_https_url(v)
  252. except ValueError as exc:
  253. raise ValueError(f"icon_url: {exc}") from exc
  254. return v
  255. def _validate_issuer_url(v: str | None) -> str | None:
  256. """Nit4: Reject non-HTTPS issuer URLs and private/loopback/link-local hosts.
  257. HTTP is no longer accepted — OIDC providers must be reachable over TLS.
  258. Private-network and loopback addresses are rejected to prevent SSRF attacks
  259. where an admin-supplied URL could reach internal services.
  260. """
  261. import ipaddress
  262. from urllib.parse import urlparse
  263. if v is None:
  264. return v
  265. if not v.startswith("https://"):
  266. raise ValueError("issuer_url must start with https://")
  267. host = urlparse(v).hostname or ""
  268. try:
  269. addr = ipaddress.ip_address(host)
  270. if addr.is_private or addr.is_loopback or addr.is_link_local:
  271. raise ValueError("issuer_url must not point to a private, loopback, or link-local address")
  272. except ValueError as exc:
  273. if "issuer_url" in str(exc):
  274. raise
  275. # hostname is a domain name, not a bare IP — that's fine
  276. return v
  277. def _validate_scopes(v: str | None) -> str | None:
  278. """Nit5: Require that the 'openid' scope is present.
  279. The OpenID Connect spec mandates the 'openid' scope; without it the
  280. response is plain OAuth2, not OIDC, and claims like sub/email are not
  281. guaranteed.
  282. """
  283. if v is None:
  284. return v
  285. scope_list = v.split()
  286. if "openid" not in scope_list:
  287. raise ValueError("scopes must include 'openid'")
  288. return v
  289. class OIDCProviderCreate(BaseModel):
  290. name: str = Field(..., max_length=100) # L-NEW-4
  291. issuer_url: str
  292. client_id: str = Field(..., max_length=256) # L-NEW-4
  293. client_secret: str = Field(..., max_length=512) # L-NEW-4: Fernet input bounded
  294. scopes: str = Field(default="openid email profile", max_length=256) # L-NEW-4
  295. is_enabled: bool = True
  296. auto_create_users: bool = False
  297. auto_link_existing_accounts: bool = False # M-2: conservative default, opt-in only
  298. email_claim: str = Field(default="email", max_length=64)
  299. require_email_verified: bool = True
  300. icon_url: str | None = None
  301. default_group_id: int | None = None
  302. @field_validator("issuer_url")
  303. @classmethod
  304. def validate_issuer_url(cls, v: str) -> str:
  305. result = _validate_issuer_url(v)
  306. if result is None:
  307. raise ValueError("issuer_url is required")
  308. return result
  309. @field_validator("scopes")
  310. @classmethod
  311. def validate_scopes(cls, v: str) -> str:
  312. result = _validate_scopes(v)
  313. if result is None:
  314. raise ValueError("scopes is required")
  315. return result
  316. @field_validator("email_claim")
  317. @classmethod
  318. def validate_email_claim(cls, v: str) -> str:
  319. return _validate_email_claim_name(v)
  320. @field_validator("icon_url")
  321. @classmethod
  322. def validate_icon_url(cls, v: str | None) -> str | None:
  323. return _validate_icon_url(v)
  324. # SEC-1: auto_link with email_claim='email' requires require_email_verified=True.
  325. # Fall B (require_email_verified=False + email_claim='email') accepts absent email_verified → account-takeover risk.
  326. # Fall C (custom claim != 'email') is safe: no email_verified gate on that path regardless of require_email_verified.
  327. @model_validator(mode="after")
  328. def check_auto_link_requires_verified(self) -> "OIDCProviderCreate":
  329. if self.auto_link_existing_accounts and self.email_claim == "email" and not self.require_email_verified:
  330. raise ValueError(AUTO_LINK_REQUIREMENTS_ERROR)
  331. return self
  332. class OIDCProviderUpdate(BaseModel):
  333. name: str | None = Field(default=None, max_length=100)
  334. issuer_url: str | None = None
  335. @field_validator("issuer_url")
  336. @classmethod
  337. def validate_issuer_url(cls, v: str | None) -> str | None:
  338. return _validate_issuer_url(v)
  339. client_id: str | None = Field(default=None, max_length=256)
  340. client_secret: str | None = Field(default=None, max_length=512)
  341. scopes: str | None = Field(default=None, max_length=256)
  342. is_enabled: bool | None = None
  343. auto_create_users: bool | None = None
  344. auto_link_existing_accounts: bool | None = None
  345. email_claim: str | None = Field(default=None, max_length=64)
  346. require_email_verified: bool | None = None
  347. icon_url: str | None = None
  348. default_group_id: int | None = None
  349. @field_validator("scopes")
  350. @classmethod
  351. def validate_scopes(cls, v: str | None) -> str | None:
  352. return _validate_scopes(v)
  353. @field_validator("email_claim")
  354. @classmethod
  355. def validate_email_claim(cls, v: str | None) -> str | None:
  356. if v is None:
  357. return None
  358. return _validate_email_claim_name(v)
  359. @field_validator("icon_url")
  360. @classmethod
  361. def validate_icon_url(cls, v: str | None) -> str | None:
  362. return _validate_icon_url(v)
  363. # SEC-1 (schema-level): blocks only when auto_link=True + email_claim='email' + require_email_verified=False
  364. # arrive in the same request. email_claim=None means the request leaves it unchanged (still 'email' by default),
  365. # so that is also treated as 'email'. Partial updates spanning two requests are caught by the
  366. # Combined-State-Guard in the route handler after the setattr loop.
  367. @model_validator(mode="after")
  368. def check_auto_link_requires_verified(self) -> "OIDCProviderUpdate":
  369. if (
  370. self.auto_link_existing_accounts is True
  371. and self.require_email_verified is False
  372. and (self.email_claim is None or self.email_claim == "email")
  373. ):
  374. raise ValueError(AUTO_LINK_REQUIREMENTS_ERROR)
  375. return self
  376. class OIDCProviderResponse(BaseModel):
  377. id: int
  378. name: str
  379. issuer_url: str
  380. client_id: str
  381. scopes: str
  382. is_enabled: bool
  383. auto_create_users: bool
  384. auto_link_existing_accounts: bool = False
  385. email_claim: str = "email"
  386. require_email_verified: bool = True
  387. icon_url: str | None = None
  388. default_group_id: int | None = None
  389. # Set explicitly in the route handler from `icon_content_type is not None`
  390. # rather than `@computed_field` (project policy) or `icon_data is not None`
  391. # (would trigger an async lazy-load on the deferred BLOB column).
  392. # Required (no default) so Pydantic fails loudly if any code path skips
  393. # `_build_provider_response` and tries `model_validate(provider)` directly.
  394. has_icon: bool
  395. class Config:
  396. from_attributes = True
  397. class OIDCAuthorizeResponse(BaseModel):
  398. auth_url: str
  399. class OIDCExchangeRequest(BaseModel):
  400. oidc_token: str = Field(..., max_length=128)
  401. class OIDCLinkResponse(BaseModel):
  402. id: int
  403. provider_id: int
  404. provider_name: str
  405. provider_email: str | None = None
  406. created_at: str
  407. class EncryptionRowCounts(BaseModel):
  408. oidc_providers: int
  409. user_totp: int
  410. class EncryptionStatusResponse(BaseModel):
  411. key_configured: bool
  412. key_source: Literal["env", "file", "generated", "none"]
  413. legacy_plaintext_rows: EncryptionRowCounts
  414. encrypted_rows: EncryptionRowCounts
  415. # B4: filled by the endpoint after a sample-decrypt of one encrypted row,
  416. # so a wrong-key state (where key_configured=True but rows decrypt to junk)
  417. # is detected, not just the no-key case.
  418. decryption_broken: bool = False
  419. # B2: number of rows skipped during the last legacy re-encryption migration.
  420. # Filled from backend.app.core.database.get_migration_error_count().
  421. migration_error_count: int = 0