auth.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. import re
  2. from typing import Literal
  3. from pydantic import BaseModel, Field, field_validator
  4. def _validate_password_complexity(v: str) -> str:
  5. """Enforce minimum password complexity (M-C).
  6. Requires at least one uppercase letter, one lowercase letter, one digit,
  7. and one special character in addition to the min_length=8 Field constraint.
  8. """
  9. if not re.search(r"[A-Z]", v):
  10. raise ValueError("Password must contain at least one uppercase letter")
  11. if not re.search(r"[a-z]", v):
  12. raise ValueError("Password must contain at least one lowercase letter")
  13. if not re.search(r"\d", v):
  14. raise ValueError("Password must contain at least one digit")
  15. if not re.search(r"[^A-Za-z0-9]", v):
  16. raise ValueError("Password must contain at least one special character")
  17. return v
  18. class GroupBrief(BaseModel):
  19. """Brief group info for embedding in user responses."""
  20. id: int
  21. name: str
  22. class Config:
  23. from_attributes = True
  24. class LoginRequest(BaseModel):
  25. username: str = Field(..., max_length=150)
  26. password: str = Field(..., max_length=256)
  27. class LoginResponse(BaseModel):
  28. access_token: str | None = None
  29. token_type: str = "bearer"
  30. user: "UserResponse | None" = None
  31. # Set when 2FA is required; the frontend must call /auth/2fa/verify
  32. requires_2fa: bool = False
  33. pre_auth_token: str | None = None
  34. two_fa_methods: list[str] = []
  35. class UserCreate(BaseModel):
  36. username: str = Field(..., max_length=150)
  37. password: str | None = Field(default=None, max_length=256) # M-NEW-4: cap before pbkdf2
  38. email: str | None = Field(default=None, max_length=254) # L-NEW-5: RFC 5321 max
  39. role: str = "user"
  40. group_ids: list[int] | None = None
  41. @field_validator("password")
  42. @classmethod
  43. def validate_password(cls, v: str | None) -> str | None:
  44. if v is not None:
  45. _validate_password_complexity(v)
  46. return v
  47. class UserUpdate(BaseModel):
  48. username: str | None = Field(default=None, max_length=150)
  49. password: str | None = Field(default=None, max_length=256) # M-NEW-4: cap before pbkdf2
  50. email: str | None = Field(default=None, max_length=254) # L-NEW-5: RFC 5321 max
  51. role: str | None = None
  52. is_active: bool | None = None
  53. group_ids: list[int] | None = None
  54. @field_validator("password")
  55. @classmethod
  56. def validate_password(cls, v: str | None) -> str | None:
  57. if v is not None:
  58. _validate_password_complexity(v)
  59. return v
  60. class UserResponse(BaseModel):
  61. id: int
  62. username: str
  63. email: str | None = None
  64. role: str # Deprecated, kept for backward compatibility
  65. is_active: bool
  66. is_admin: bool # Computed from role and group membership
  67. auth_source: str = "local" # "local" or "ldap"
  68. groups: list[GroupBrief] = []
  69. permissions: list[str] = [] # All permissions from groups
  70. created_at: str
  71. class Config:
  72. from_attributes = True
  73. class ChangePasswordRequest(BaseModel):
  74. current_password: str = Field(..., max_length=256) # M-NEW-3: cap before pbkdf2
  75. new_password: str = Field(..., min_length=8, max_length=256)
  76. @field_validator("new_password")
  77. @classmethod
  78. def validate_new_password(cls, v: str) -> str:
  79. return _validate_password_complexity(v)
  80. class SetupRequest(BaseModel):
  81. auth_enabled: bool
  82. admin_username: str | None = Field(default=None, max_length=150)
  83. admin_password: str | None = Field(default=None, max_length=256)
  84. # Password complexity is NOT validated at the schema layer. When re-enabling auth
  85. # with an existing admin user (or when LDAP is the auth backend), the frontend
  86. # still sends whatever is in the password field but the route ignores it.
  87. # Enforcing complexity here would reject those legitimate flows. The route body
  88. # applies the check only when a brand-new local admin is actually being created.
  89. class SetupResponse(BaseModel):
  90. auth_enabled: bool
  91. admin_created: bool | None = None
  92. class ForgotPasswordRequest(BaseModel):
  93. email: str = Field(..., max_length=254) # L-NEW-1: RFC 5321 max; caps memory/CPU before lookup
  94. class ForgotPasswordConfirmRequest(BaseModel):
  95. token: str = Field(..., max_length=128)
  96. new_password: str = Field(..., min_length=8, max_length=256)
  97. @field_validator("new_password")
  98. @classmethod
  99. def validate_new_password(cls, v: str) -> str:
  100. return _validate_password_complexity(v)
  101. class ForgotPasswordResponse(BaseModel):
  102. message: str
  103. class ResetPasswordRequest(BaseModel):
  104. user_id: int
  105. class ResetPasswordResponse(BaseModel):
  106. message: str
  107. class SMTPSettings(BaseModel):
  108. smtp_host: str
  109. smtp_port: int
  110. smtp_username: str | None = None # Optional when auth is disabled
  111. smtp_password: str | None = None # Optional for read operations or when auth is disabled
  112. smtp_security: str = "starttls" # 'starttls', 'ssl', 'none'
  113. smtp_auth_enabled: bool = True
  114. smtp_from_email: str
  115. smtp_from_name: str = "BamBuddy"
  116. # Deprecated field for backward compatibility
  117. smtp_use_tls: bool | None = None
  118. class TestSMTPRequest(BaseModel):
  119. test_recipient: str
  120. class TestSMTPResponse(BaseModel):
  121. success: bool
  122. message: str
  123. # ---------------------------------------------------------------------------
  124. # 2FA / MFA schemas
  125. # ---------------------------------------------------------------------------
  126. class TwoFAStatusResponse(BaseModel):
  127. totp_enabled: bool
  128. email_otp_enabled: bool
  129. backup_codes_remaining: int
  130. class TOTPSetupResponse(BaseModel):
  131. """Returned when a user initiates TOTP setup. The frontend should display
  132. the QR code image (base64 PNG) and ask the user to scan it, then call
  133. /auth/2fa/totp/enable with a valid code to confirm."""
  134. secret: str # base32 secret (shown as fallback text)
  135. qr_code_b64: str # base64-encoded PNG of the QR code
  136. issuer: str
  137. class TOTPSetupRequest(BaseModel):
  138. """Optional body for POST /auth/2fa/totp/setup.
  139. Only required when re-initialising setup while an active TOTP record exists.
  140. Provide the current TOTP code (from the existing authenticator app) to
  141. confirm intent — mirrors the verification requirement in disable_totp.
  142. """
  143. code: str | None = Field(default=None, max_length=8) # L-NEW-2: bound before pyotp
  144. class TOTPEnableRequest(BaseModel):
  145. code: str # 6-digit TOTP code from the authenticator app
  146. @field_validator("code")
  147. @classmethod
  148. def validate_code(cls, v: str) -> str:
  149. v = v.strip()
  150. if not v.isdigit() or len(v) != 6:
  151. raise ValueError("TOTP code must be exactly 6 digits")
  152. return v
  153. class TOTPEnableResponse(BaseModel):
  154. message: str
  155. backup_codes: list[str] # plain-text codes shown once; user must save them
  156. class TOTPDisableRequest(BaseModel):
  157. """Requires a valid TOTP code OR a backup code to disable TOTP."""
  158. code: str = Field(..., max_length=128)
  159. class BackupCodesResponse(BaseModel):
  160. backup_codes: list[str]
  161. message: str
  162. class EmailOTPEnableRequest(BaseModel):
  163. """No body required — email is taken from the authenticated user's profile."""
  164. pass
  165. class TwoFAVerifyRequest(BaseModel):
  166. pre_auth_token: str = Field(..., max_length=128)
  167. # TOTP/email codes are 6 digits; backup codes are 8 uppercase alphanumeric chars.
  168. # max_length=8 prevents excessively long inputs from reaching pbkdf2/pyotp.
  169. code: str = Field(..., min_length=6, max_length=8)
  170. method: Literal["totp", "email", "backup"] = "totp"
  171. @field_validator("code")
  172. @classmethod
  173. def validate_code_format(cls, v: str) -> str:
  174. v = v.strip()
  175. if not re.match(r"^[A-Za-z0-9]{6,8}$", v):
  176. raise ValueError("Code must be 6–8 alphanumeric characters")
  177. return v.upper() # normalise backup codes to uppercase
  178. class TwoFAVerifyResponse(BaseModel):
  179. access_token: str
  180. token_type: str = "bearer"
  181. user: "UserResponse"
  182. class EmailOTPSendRequest(BaseModel):
  183. pre_auth_token: str = Field(..., max_length=128)
  184. class EmailOTPEnableConfirmRequest(BaseModel):
  185. """Body for the second step of email OTP enable: verify the proof-of-possession code."""
  186. setup_token: str = Field(..., max_length=128)
  187. # L-NEW-3: email OTP setup codes are always exactly 6 digits; reject anything else.
  188. code: str = Field(..., min_length=6, max_length=6)
  189. @field_validator("code")
  190. @classmethod
  191. def validate_code_digits(cls, v: str) -> str:
  192. v = v.strip()
  193. if not v.isdigit() or len(v) != 6:
  194. raise ValueError("Email OTP setup code must be exactly 6 digits")
  195. return v
  196. class EmailOTPDisableRequest(BaseModel):
  197. """Requires the account password to disable email OTP."""
  198. password: str = Field(..., max_length=256)
  199. class AdminDisable2FARequest(BaseModel):
  200. """Admin must supply their own password as re-auth before disabling 2FA for another user.
  201. OIDC/LDAP-only admins (no local password_hash) are exempt from this check.
  202. """
  203. admin_password: str | None = Field(default=None, max_length=256)
  204. # ---------------------------------------------------------------------------
  205. # OIDC schemas
  206. # ---------------------------------------------------------------------------
  207. def _validate_icon_url(v: str | None) -> str | None:
  208. """Reject non-HTTPS icon URLs to prevent SSRF / mixed-content issues."""
  209. if v is None:
  210. return v
  211. if not v.startswith("https://"):
  212. raise ValueError("icon_url must start with https://")
  213. return v
  214. def _validate_issuer_url(v: str | None) -> str | None:
  215. """Nit4: Reject non-HTTPS issuer URLs and private/loopback/link-local hosts.
  216. HTTP is no longer accepted — OIDC providers must be reachable over TLS.
  217. Private-network and loopback addresses are rejected to prevent SSRF attacks
  218. where an admin-supplied URL could reach internal services.
  219. """
  220. import ipaddress
  221. from urllib.parse import urlparse
  222. if v is None:
  223. return v
  224. if not v.startswith("https://"):
  225. raise ValueError("issuer_url must start with https://")
  226. host = urlparse(v).hostname or ""
  227. try:
  228. addr = ipaddress.ip_address(host)
  229. if addr.is_private or addr.is_loopback or addr.is_link_local:
  230. raise ValueError("issuer_url must not point to a private, loopback, or link-local address")
  231. except ValueError as exc:
  232. if "issuer_url" in str(exc):
  233. raise
  234. # hostname is a domain name, not a bare IP — that's fine
  235. return v
  236. def _validate_scopes(v: str | None) -> str | None:
  237. """Nit5: Require that the 'openid' scope is present.
  238. The OpenID Connect spec mandates the 'openid' scope; without it the
  239. response is plain OAuth2, not OIDC, and claims like sub/email are not
  240. guaranteed.
  241. """
  242. if v is None:
  243. return v
  244. scope_list = v.split()
  245. if "openid" not in scope_list:
  246. raise ValueError("scopes must include 'openid'")
  247. return v
  248. class OIDCProviderCreate(BaseModel):
  249. name: str = Field(..., max_length=100) # L-NEW-4
  250. issuer_url: str
  251. client_id: str = Field(..., max_length=256) # L-NEW-4
  252. client_secret: str = Field(..., max_length=512) # L-NEW-4: Fernet input bounded
  253. scopes: str = Field(default="openid email profile", max_length=256) # L-NEW-4
  254. is_enabled: bool = True
  255. auto_create_users: bool = False
  256. auto_link_existing_accounts: bool = False # M-2: conservative default, opt-in only
  257. icon_url: str | None = None
  258. @field_validator("issuer_url")
  259. @classmethod
  260. def validate_issuer_url(cls, v: str) -> str:
  261. result = _validate_issuer_url(v)
  262. assert result is not None
  263. return result
  264. @field_validator("scopes")
  265. @classmethod
  266. def validate_scopes(cls, v: str) -> str:
  267. result = _validate_scopes(v)
  268. assert result is not None
  269. return result
  270. @field_validator("icon_url")
  271. @classmethod
  272. def validate_icon_url(cls, v: str | None) -> str | None:
  273. return _validate_icon_url(v)
  274. class OIDCProviderUpdate(BaseModel):
  275. name: str | None = Field(default=None, max_length=100)
  276. issuer_url: str | None = None
  277. @field_validator("issuer_url")
  278. @classmethod
  279. def validate_issuer_url(cls, v: str | None) -> str | None:
  280. return _validate_issuer_url(v)
  281. client_id: str | None = Field(default=None, max_length=256)
  282. client_secret: str | None = Field(default=None, max_length=512)
  283. scopes: str | None = Field(default=None, max_length=256)
  284. is_enabled: bool | None = None
  285. auto_create_users: bool | None = None
  286. auto_link_existing_accounts: bool | None = None
  287. icon_url: str | None = None
  288. @field_validator("scopes")
  289. @classmethod
  290. def validate_scopes(cls, v: str | None) -> str | None:
  291. return _validate_scopes(v)
  292. @field_validator("icon_url")
  293. @classmethod
  294. def validate_icon_url(cls, v: str | None) -> str | None:
  295. return _validate_icon_url(v)
  296. class OIDCProviderResponse(BaseModel):
  297. id: int
  298. name: str
  299. issuer_url: str
  300. client_id: str
  301. scopes: str
  302. is_enabled: bool
  303. auto_create_users: bool
  304. auto_link_existing_accounts: bool = False
  305. icon_url: str | None = None
  306. class Config:
  307. from_attributes = True
  308. class OIDCAuthorizeResponse(BaseModel):
  309. auth_url: str
  310. class OIDCExchangeRequest(BaseModel):
  311. oidc_token: str = Field(..., max_length=128)
  312. class OIDCLinkResponse(BaseModel):
  313. id: int
  314. provider_id: int
  315. provider_name: str
  316. provider_email: str | None = None
  317. created_at: str