auth.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. import re
  2. from typing import Literal
  3. from pydantic import BaseModel, Field, field_validator, model_validator
  4. def _validate_password_complexity(v: str) -> str:
  5. """Enforce minimum password complexity (M-C).
  6. Requires at least one uppercase letter, one lowercase letter, one digit,
  7. and one special character in addition to the min_length=8 Field constraint.
  8. """
  9. if not re.search(r"[A-Z]", v):
  10. raise ValueError("Password must contain at least one uppercase letter")
  11. if not re.search(r"[a-z]", v):
  12. raise ValueError("Password must contain at least one lowercase letter")
  13. if not re.search(r"\d", v):
  14. raise ValueError("Password must contain at least one digit")
  15. if not re.search(r"[^A-Za-z0-9]", v):
  16. raise ValueError("Password must contain at least one special character")
  17. return v
  18. class GroupBrief(BaseModel):
  19. """Brief group info for embedding in user responses."""
  20. id: int
  21. name: str
  22. class Config:
  23. from_attributes = True
  24. class LoginRequest(BaseModel):
  25. username: str = Field(..., max_length=150)
  26. password: str = Field(..., max_length=256)
  27. class LoginResponse(BaseModel):
  28. access_token: str | None = None
  29. token_type: str = "bearer"
  30. user: "UserResponse | None" = None
  31. # Set when 2FA is required; the frontend must call /auth/2fa/verify
  32. requires_2fa: bool = False
  33. pre_auth_token: str | None = None
  34. two_fa_methods: list[str] = []
  35. class UserCreate(BaseModel):
  36. username: str = Field(..., max_length=150)
  37. password: str | None = Field(default=None, max_length=256) # M-NEW-4: cap before pbkdf2
  38. email: str | None = Field(default=None, max_length=254) # L-NEW-5: RFC 5321 max
  39. role: str = "user"
  40. group_ids: list[int] | None = None
  41. @field_validator("password")
  42. @classmethod
  43. def validate_password(cls, v: str | None) -> str | None:
  44. if v is not None:
  45. _validate_password_complexity(v)
  46. return v
  47. class UserUpdate(BaseModel):
  48. username: str | None = Field(default=None, max_length=150)
  49. password: str | None = Field(default=None, max_length=256) # M-NEW-4: cap before pbkdf2
  50. email: str | None = Field(default=None, max_length=254) # L-NEW-5: RFC 5321 max
  51. role: str | None = None
  52. is_active: bool | None = None
  53. group_ids: list[int] | None = None
  54. @field_validator("password")
  55. @classmethod
  56. def validate_password(cls, v: str | None) -> str | None:
  57. if v is not None:
  58. _validate_password_complexity(v)
  59. return v
  60. class UserResponse(BaseModel):
  61. id: int
  62. username: str
  63. email: str | None = None
  64. role: str # Deprecated, kept for backward compatibility
  65. is_active: bool
  66. is_admin: bool # Computed from role and group membership
  67. auth_source: str = "local" # "local" or "ldap"
  68. groups: list[GroupBrief] = []
  69. permissions: list[str] = [] # All permissions from groups
  70. created_at: str
  71. class Config:
  72. from_attributes = True
  73. class ChangePasswordRequest(BaseModel):
  74. current_password: str = Field(..., max_length=256) # M-NEW-3: cap before pbkdf2
  75. new_password: str = Field(..., min_length=8, max_length=256)
  76. @field_validator("new_password")
  77. @classmethod
  78. def validate_new_password(cls, v: str) -> str:
  79. return _validate_password_complexity(v)
  80. class SetupRequest(BaseModel):
  81. auth_enabled: bool
  82. admin_username: str | None = Field(default=None, max_length=150)
  83. admin_password: str | None = Field(default=None, max_length=256)
  84. # Password complexity is NOT validated at the schema layer. When re-enabling auth
  85. # with an existing admin user (or when LDAP is the auth backend), the frontend
  86. # still sends whatever is in the password field but the route ignores it.
  87. # Enforcing complexity here would reject those legitimate flows. The route body
  88. # applies the check only when a brand-new local admin is actually being created.
  89. class SetupResponse(BaseModel):
  90. auth_enabled: bool
  91. admin_created: bool | None = None
  92. class ForgotPasswordRequest(BaseModel):
  93. email: str = Field(..., max_length=254) # L-NEW-1: RFC 5321 max; caps memory/CPU before lookup
  94. class ForgotPasswordConfirmRequest(BaseModel):
  95. token: str = Field(..., max_length=128)
  96. new_password: str = Field(..., min_length=8, max_length=256)
  97. @field_validator("new_password")
  98. @classmethod
  99. def validate_new_password(cls, v: str) -> str:
  100. return _validate_password_complexity(v)
  101. class ForgotPasswordResponse(BaseModel):
  102. message: str
  103. class ResetPasswordRequest(BaseModel):
  104. user_id: int
  105. class ResetPasswordResponse(BaseModel):
  106. message: str
  107. class SMTPSettings(BaseModel):
  108. smtp_host: str
  109. smtp_port: int
  110. smtp_username: str | None = None # Optional when auth is disabled
  111. smtp_password: str | None = None # Optional for read operations or when auth is disabled
  112. smtp_security: str = "starttls" # 'starttls', 'ssl', 'none'
  113. smtp_auth_enabled: bool = True
  114. smtp_from_email: str
  115. smtp_from_name: str = "BamBuddy"
  116. # Deprecated field for backward compatibility
  117. smtp_use_tls: bool | None = None
  118. class TestSMTPRequest(BaseModel):
  119. test_recipient: str
  120. class TestSMTPResponse(BaseModel):
  121. success: bool
  122. message: str
  123. # ---------------------------------------------------------------------------
  124. # 2FA / MFA schemas
  125. # ---------------------------------------------------------------------------
  126. class TwoFAStatusResponse(BaseModel):
  127. totp_enabled: bool
  128. email_otp_enabled: bool
  129. backup_codes_remaining: int
  130. class TOTPSetupResponse(BaseModel):
  131. """Returned when a user initiates TOTP setup. The frontend should display
  132. the QR code image (base64 PNG) and ask the user to scan it, then call
  133. /auth/2fa/totp/enable with a valid code to confirm."""
  134. secret: str # base32 secret (shown as fallback text)
  135. qr_code_b64: str # base64-encoded PNG of the QR code
  136. issuer: str
  137. class TOTPSetupRequest(BaseModel):
  138. """Optional body for POST /auth/2fa/totp/setup.
  139. Only required when re-initialising setup while an active TOTP record exists.
  140. Provide the current TOTP code (from the existing authenticator app) to
  141. confirm intent — mirrors the verification requirement in disable_totp.
  142. """
  143. code: str | None = Field(default=None, max_length=8) # L-NEW-2: bound before pyotp
  144. class TOTPEnableRequest(BaseModel):
  145. code: str # 6-digit TOTP code from the authenticator app
  146. @field_validator("code")
  147. @classmethod
  148. def validate_code(cls, v: str) -> str:
  149. v = v.strip()
  150. if not v.isdigit() or len(v) != 6:
  151. raise ValueError("TOTP code must be exactly 6 digits")
  152. return v
  153. class TOTPEnableResponse(BaseModel):
  154. message: str
  155. backup_codes: list[str] # plain-text codes shown once; user must save them
  156. class TOTPDisableRequest(BaseModel):
  157. """Requires a valid TOTP code OR a backup code to disable TOTP."""
  158. code: str = Field(..., max_length=128)
  159. class BackupCodesResponse(BaseModel):
  160. backup_codes: list[str]
  161. message: str
  162. class EmailOTPEnableRequest(BaseModel):
  163. """No body required — email is taken from the authenticated user's profile."""
  164. pass
  165. class TwoFAVerifyRequest(BaseModel):
  166. pre_auth_token: str = Field(..., max_length=128)
  167. # TOTP/email codes are 6 digits; backup codes are 8 uppercase alphanumeric chars.
  168. # max_length=8 prevents excessively long inputs from reaching pbkdf2/pyotp.
  169. code: str = Field(..., min_length=6, max_length=8)
  170. method: Literal["totp", "email", "backup"] = "totp"
  171. @field_validator("code")
  172. @classmethod
  173. def validate_code_format(cls, v: str) -> str:
  174. v = v.strip()
  175. if not re.match(r"^[A-Za-z0-9]{6,8}$", v):
  176. raise ValueError("Code must be 6–8 alphanumeric characters")
  177. return v.upper() # normalise backup codes to uppercase
  178. class TwoFAVerifyResponse(BaseModel):
  179. access_token: str
  180. token_type: str = "bearer"
  181. user: "UserResponse"
  182. class EmailOTPSendRequest(BaseModel):
  183. pre_auth_token: str = Field(..., max_length=128)
  184. class EmailOTPEnableConfirmRequest(BaseModel):
  185. """Body for the second step of email OTP enable: verify the proof-of-possession code."""
  186. setup_token: str = Field(..., max_length=128)
  187. # L-NEW-3: email OTP setup codes are always exactly 6 digits; reject anything else.
  188. code: str = Field(..., min_length=6, max_length=6)
  189. @field_validator("code")
  190. @classmethod
  191. def validate_code_digits(cls, v: str) -> str:
  192. v = v.strip()
  193. if not v.isdigit() or len(v) != 6:
  194. raise ValueError("Email OTP setup code must be exactly 6 digits")
  195. return v
  196. class EmailOTPDisableRequest(BaseModel):
  197. """Requires the account password to disable email OTP."""
  198. password: str = Field(..., max_length=256)
  199. class AdminDisable2FARequest(BaseModel):
  200. """Admin must supply their own password as re-auth before disabling 2FA for another user.
  201. OIDC/LDAP-only admins (no local password_hash) are exempt from this check.
  202. """
  203. admin_password: str | None = Field(default=None, max_length=256)
  204. # ---------------------------------------------------------------------------
  205. # OIDC schemas
  206. # ---------------------------------------------------------------------------
  207. AUTO_LINK_REQUIREMENTS_ERROR = (
  208. "auto_link_existing_accounts requires require_email_verified=True when email_claim='email'"
  209. )
  210. def _validate_email_claim_name(v: str) -> str:
  211. # Accepts only alphanumeric/underscore/hyphen claim names starting with a letter —
  212. # prevents log injection and limits the attack surface of operator-supplied claim names.
  213. if not re.fullmatch(r"[a-zA-Z][a-zA-Z0-9_\-]{0,63}", v):
  214. raise ValueError("Invalid claim name")
  215. return v
  216. def _validate_icon_url(v: str | None) -> str | None:
  217. """Reject non-HTTPS icon URLs and SSRF-unsafe hosts.
  218. Delegates to the runtime SSRF guard ``assert_safe_public_https_url``
  219. so the Pydantic layer enforces the same allowlist as the fetcher —
  220. no policy drift between schema validation and SSRF check. Without
  221. this delegation the validator covered only ``is_private | is_loopback
  222. | is_link_local`` while the runtime additionally rejected numeric-
  223. encoded IPs, cloud-metadata endpoints, multicast, unspecified, and
  224. IPv4-mapped IPv6.
  225. Lazy-imported because ``_oidc_helpers`` lives under ``api/routes/``
  226. and schemas avoid top-level imports from that layer (matches the
  227. existing pattern in ``_validate_issuer_url`` which lazy-imports
  228. ``ipaddress``).
  229. """
  230. if v is None:
  231. return v
  232. if not v.startswith("https://"):
  233. # Surface the same wording the runtime guard would use, but pre-
  234. # checked here so the user-facing error doesn't depend on the
  235. # runtime call path.
  236. raise ValueError("icon_url must start with https://")
  237. from backend.app.api.routes._oidc_helpers import assert_safe_public_https_url
  238. try:
  239. assert_safe_public_https_url(v)
  240. except ValueError as exc:
  241. raise ValueError(f"icon_url: {exc}") from exc
  242. return v
  243. def _validate_issuer_url(v: str | None) -> str | None:
  244. """Nit4: Reject non-HTTPS issuer URLs and private/loopback/link-local hosts.
  245. HTTP is no longer accepted — OIDC providers must be reachable over TLS.
  246. Private-network and loopback addresses are rejected to prevent SSRF attacks
  247. where an admin-supplied URL could reach internal services.
  248. """
  249. import ipaddress
  250. from urllib.parse import urlparse
  251. if v is None:
  252. return v
  253. if not v.startswith("https://"):
  254. raise ValueError("issuer_url must start with https://")
  255. host = urlparse(v).hostname or ""
  256. try:
  257. addr = ipaddress.ip_address(host)
  258. if addr.is_private or addr.is_loopback or addr.is_link_local:
  259. raise ValueError("issuer_url must not point to a private, loopback, or link-local address")
  260. except ValueError as exc:
  261. if "issuer_url" in str(exc):
  262. raise
  263. # hostname is a domain name, not a bare IP — that's fine
  264. return v
  265. def _validate_scopes(v: str | None) -> str | None:
  266. """Nit5: Require that the 'openid' scope is present.
  267. The OpenID Connect spec mandates the 'openid' scope; without it the
  268. response is plain OAuth2, not OIDC, and claims like sub/email are not
  269. guaranteed.
  270. """
  271. if v is None:
  272. return v
  273. scope_list = v.split()
  274. if "openid" not in scope_list:
  275. raise ValueError("scopes must include 'openid'")
  276. return v
  277. class OIDCProviderCreate(BaseModel):
  278. name: str = Field(..., max_length=100) # L-NEW-4
  279. issuer_url: str
  280. client_id: str = Field(..., max_length=256) # L-NEW-4
  281. client_secret: str = Field(..., max_length=512) # L-NEW-4: Fernet input bounded
  282. scopes: str = Field(default="openid email profile", max_length=256) # L-NEW-4
  283. is_enabled: bool = True
  284. auto_create_users: bool = False
  285. auto_link_existing_accounts: bool = False # M-2: conservative default, opt-in only
  286. email_claim: str = Field(default="email", max_length=64)
  287. require_email_verified: bool = True
  288. icon_url: str | None = None
  289. default_group_id: int | None = None
  290. @field_validator("issuer_url")
  291. @classmethod
  292. def validate_issuer_url(cls, v: str) -> str:
  293. result = _validate_issuer_url(v)
  294. if result is None:
  295. raise ValueError("issuer_url is required")
  296. return result
  297. @field_validator("scopes")
  298. @classmethod
  299. def validate_scopes(cls, v: str) -> str:
  300. result = _validate_scopes(v)
  301. if result is None:
  302. raise ValueError("scopes is required")
  303. return result
  304. @field_validator("email_claim")
  305. @classmethod
  306. def validate_email_claim(cls, v: str) -> str:
  307. return _validate_email_claim_name(v)
  308. @field_validator("icon_url")
  309. @classmethod
  310. def validate_icon_url(cls, v: str | None) -> str | None:
  311. return _validate_icon_url(v)
  312. # SEC-1: auto_link with email_claim='email' requires require_email_verified=True.
  313. # Fall B (require_email_verified=False + email_claim='email') accepts absent email_verified → account-takeover risk.
  314. # Fall C (custom claim != 'email') is safe: no email_verified gate on that path regardless of require_email_verified.
  315. @model_validator(mode="after")
  316. def check_auto_link_requires_verified(self) -> "OIDCProviderCreate":
  317. if self.auto_link_existing_accounts and self.email_claim == "email" and not self.require_email_verified:
  318. raise ValueError(AUTO_LINK_REQUIREMENTS_ERROR)
  319. return self
  320. class OIDCProviderUpdate(BaseModel):
  321. name: str | None = Field(default=None, max_length=100)
  322. issuer_url: str | None = None
  323. @field_validator("issuer_url")
  324. @classmethod
  325. def validate_issuer_url(cls, v: str | None) -> str | None:
  326. return _validate_issuer_url(v)
  327. client_id: str | None = Field(default=None, max_length=256)
  328. client_secret: str | None = Field(default=None, max_length=512)
  329. scopes: str | None = Field(default=None, max_length=256)
  330. is_enabled: bool | None = None
  331. auto_create_users: bool | None = None
  332. auto_link_existing_accounts: bool | None = None
  333. email_claim: str | None = Field(default=None, max_length=64)
  334. require_email_verified: bool | None = None
  335. icon_url: str | None = None
  336. default_group_id: int | None = None
  337. @field_validator("scopes")
  338. @classmethod
  339. def validate_scopes(cls, v: str | None) -> str | None:
  340. return _validate_scopes(v)
  341. @field_validator("email_claim")
  342. @classmethod
  343. def validate_email_claim(cls, v: str | None) -> str | None:
  344. if v is None:
  345. return None
  346. return _validate_email_claim_name(v)
  347. @field_validator("icon_url")
  348. @classmethod
  349. def validate_icon_url(cls, v: str | None) -> str | None:
  350. return _validate_icon_url(v)
  351. # SEC-1 (schema-level): blocks only when auto_link=True + email_claim='email' + require_email_verified=False
  352. # arrive in the same request. email_claim=None means the request leaves it unchanged (still 'email' by default),
  353. # so that is also treated as 'email'. Partial updates spanning two requests are caught by the
  354. # Combined-State-Guard in the route handler after the setattr loop.
  355. @model_validator(mode="after")
  356. def check_auto_link_requires_verified(self) -> "OIDCProviderUpdate":
  357. if (
  358. self.auto_link_existing_accounts is True
  359. and self.require_email_verified is False
  360. and (self.email_claim is None or self.email_claim == "email")
  361. ):
  362. raise ValueError(AUTO_LINK_REQUIREMENTS_ERROR)
  363. return self
  364. class OIDCProviderResponse(BaseModel):
  365. id: int
  366. name: str
  367. issuer_url: str
  368. client_id: str
  369. scopes: str
  370. is_enabled: bool
  371. auto_create_users: bool
  372. auto_link_existing_accounts: bool = False
  373. email_claim: str = "email"
  374. require_email_verified: bool = True
  375. icon_url: str | None = None
  376. default_group_id: int | None = None
  377. # Set explicitly in the route handler from `icon_content_type is not None`
  378. # rather than `@computed_field` (project policy) or `icon_data is not None`
  379. # (would trigger an async lazy-load on the deferred BLOB column).
  380. # Required (no default) so Pydantic fails loudly if any code path skips
  381. # `_build_provider_response` and tries `model_validate(provider)` directly.
  382. has_icon: bool
  383. class Config:
  384. from_attributes = True
  385. class OIDCAuthorizeResponse(BaseModel):
  386. auth_url: str
  387. class OIDCExchangeRequest(BaseModel):
  388. oidc_token: str = Field(..., max_length=128)
  389. class OIDCLinkResponse(BaseModel):
  390. id: int
  391. provider_id: int
  392. provider_name: str
  393. provider_email: str | None = None
  394. created_at: str
  395. class EncryptionRowCounts(BaseModel):
  396. oidc_providers: int
  397. user_totp: int
  398. class EncryptionStatusResponse(BaseModel):
  399. key_configured: bool
  400. key_source: Literal["env", "file", "generated", "none"]
  401. legacy_plaintext_rows: EncryptionRowCounts
  402. encrypted_rows: EncryptionRowCounts
  403. # B4: filled by the endpoint after a sample-decrypt of one encrypted row,
  404. # so a wrong-key state (where key_configured=True but rows decrypt to junk)
  405. # is detected, not just the no-key case.
  406. decryption_broken: bool = False
  407. # B2: number of rows skipped during the last legacy re-encryption migration.
  408. # Filled from backend.app.core.database.get_migration_error_count().
  409. migration_error_count: int = 0