ldap_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. """LDAP authentication service for BamBuddy (#794).
  2. Supports:
  3. - LDAP bind authentication (simple bind with user's credentials)
  4. - StartTLS, LDAPS, and plaintext connections
  5. - User search with configurable filter
  6. - Group membership resolution for role mapping
  7. """
  8. from __future__ import annotations
  9. import json
  10. import logging
  11. from dataclasses import dataclass
  12. from ldap3 import ALL, SUBTREE, Connection, Server, Tls
  13. logger = logging.getLogger(__name__)
  14. @dataclass
  15. class LDAPUserInfo:
  16. """User information retrieved from LDAP after successful authentication."""
  17. username: str
  18. email: str | None
  19. display_name: str | None
  20. groups: list[str] # List of group DNs the user belongs to
  21. @dataclass
  22. class LDAPSearchResult:
  23. """A directory user returned by the admin search endpoint (no auth performed)."""
  24. username: str
  25. email: str | None
  26. display_name: str | None
  27. dn: str
  28. @dataclass
  29. class LDAPConfig:
  30. """LDAP configuration parsed from settings."""
  31. server_url: str
  32. bind_dn: str
  33. bind_password: str
  34. search_base: str
  35. user_filter: str # e.g. "(sAMAccountName={username})"
  36. security: str # "none", "starttls", "ldaps"
  37. group_mapping: dict[str, str] # LDAP group DN -> BamBuddy group name
  38. auto_provision: bool
  39. ca_cert_path: str # Path to CA certificate file (empty = skip verification)
  40. default_group: str # Fallback BamBuddy group assigned when user has no mapped groups (empty = no fallback)
  41. def parse_ldap_config(settings: dict[str, str]) -> LDAPConfig | None:
  42. """Parse LDAP config from settings key-value pairs. Returns None if LDAP not enabled."""
  43. if settings.get("ldap_enabled", "false").lower() != "true":
  44. return None
  45. server_url = settings.get("ldap_server_url", "").strip()
  46. if not server_url:
  47. return None
  48. group_mapping_raw = settings.get("ldap_group_mapping", "")
  49. try:
  50. group_mapping = json.loads(group_mapping_raw) if group_mapping_raw else {}
  51. except json.JSONDecodeError:
  52. group_mapping = {}
  53. return LDAPConfig(
  54. server_url=server_url,
  55. bind_dn=settings.get("ldap_bind_dn", "").strip(),
  56. bind_password=settings.get("ldap_bind_password", ""),
  57. search_base=settings.get("ldap_search_base", "").strip(),
  58. user_filter=settings.get("ldap_user_filter", "(sAMAccountName={username})").strip(),
  59. security=settings.get("ldap_security", "starttls").strip(),
  60. group_mapping=group_mapping if isinstance(group_mapping, dict) else {},
  61. auto_provision=settings.get("ldap_auto_provision", "false").lower() == "true",
  62. ca_cert_path=settings.get("ldap_ca_cert_path", "").strip(),
  63. default_group=settings.get("ldap_default_group", "").strip(),
  64. )
  65. def _create_server(config: LDAPConfig) -> Server:
  66. """Create an ldap3 Server instance from config.
  67. Always uses TLS — either LDAPS (TLS from start) or StartTLS (upgrade after connect).
  68. Plaintext LDAP is not supported.
  69. """
  70. import ssl
  71. use_ssl = config.security == "ldaps" or config.server_url.startswith("ldaps://")
  72. if config.ca_cert_path:
  73. tls = Tls(validate=ssl.CERT_REQUIRED, ca_certs_file=config.ca_cert_path)
  74. else:
  75. tls = Tls(validate=ssl.CERT_NONE)
  76. return Server(config.server_url, use_ssl=use_ssl, tls=tls, get_info=ALL, connect_timeout=10)
  77. def _open_service_connection(config: LDAPConfig, server: Server, *, check_names: bool = True) -> Connection:
  78. """Open and bind a service-account LDAP connection. Raises on failure.
  79. `check_names` toggles ldap3's client-side attribute-name validation. The
  80. default keeps it on so typos in `user_filter` fail loudly. The fuzzy
  81. directory search disables it because its fixed OR filter spans both AD-only
  82. (sAMAccountName, displayName) and OpenLDAP-only attribute names — without
  83. this bypass ldap3 throws `LDAPAttributeError` before any request is sent
  84. on a directory whose schema doesn't define one of the names.
  85. """
  86. conn = Connection(
  87. server,
  88. user=config.bind_dn,
  89. password=config.bind_password,
  90. auto_bind=False,
  91. raise_exceptions=True,
  92. read_only=True,
  93. check_names=check_names,
  94. )
  95. conn.open()
  96. if config.security == "starttls" and not config.server_url.startswith("ldaps://"):
  97. conn.start_tls()
  98. conn.bind()
  99. return conn
  100. def _pick_canonical_username(entry, fallback: str) -> str:
  101. """Prefer sAMAccountName, then uid, then the supplied fallback."""
  102. if hasattr(entry, "sAMAccountName") and entry.sAMAccountName:
  103. return str(entry.sAMAccountName)
  104. if hasattr(entry, "uid") and entry.uid:
  105. return str(entry.uid)
  106. return fallback
  107. def _extract_user_info(
  108. service_conn: Connection, config: LDAPConfig, user_entry, fallback_username: str
  109. ) -> LDAPUserInfo:
  110. """Build an LDAPUserInfo from an already-fetched directory entry.
  111. Collects memberOf groups, POSIX memberUid groups, and the primary
  112. gidNumber group; dedups DNs case-insensitively. Uses the supplied
  113. service-bound connection to resolve POSIX groups.
  114. """
  115. email = str(user_entry.mail) if hasattr(user_entry, "mail") and user_entry.mail else None
  116. display_name = (
  117. str(user_entry.displayName) if hasattr(user_entry, "displayName") and user_entry.displayName else None
  118. )
  119. # Collect groups from memberOf attribute (Active Directory / groupOfNames)
  120. groups = [str(g) for g in user_entry.memberOf] if hasattr(user_entry, "memberOf") and user_entry.memberOf else []
  121. canonical_username = _pick_canonical_username(user_entry, fallback_username)
  122. # Also search for POSIX groups (memberUid-based) using the service account
  123. posix_filter = f"(&(objectClass=posixGroup)(memberUid={_ldap_escape(canonical_username)}))"
  124. service_conn.search(
  125. search_base=config.search_base,
  126. search_filter=posix_filter,
  127. search_scope=SUBTREE,
  128. attributes=["cn"],
  129. )
  130. for entry in service_conn.entries:
  131. groups.append(str(entry.entry_dn))
  132. # POSIX primary group: user's gidNumber matches a posixGroup's gidNumber.
  133. # Standard Unix semantics treat this as full group membership, so we need
  134. # to resolve it to a group DN alongside the memberUid results.
  135. if hasattr(user_entry, "gidNumber") and user_entry.gidNumber:
  136. primary_gid = str(user_entry.gidNumber)
  137. primary_filter = f"(&(objectClass=posixGroup)(gidNumber={_ldap_escape(primary_gid)}))"
  138. service_conn.search(
  139. search_base=config.search_base,
  140. search_filter=primary_filter,
  141. search_scope=SUBTREE,
  142. attributes=["cn"],
  143. )
  144. for entry in service_conn.entries:
  145. groups.append(str(entry.entry_dn))
  146. # Dedupe group DNs (user may be in a group via both memberUid and primary gidNumber).
  147. # Case-insensitive comparison — LDAP DNs are case-insensitive by spec.
  148. seen_lower: set[str] = set()
  149. deduped_groups: list[str] = []
  150. for g in groups:
  151. key = g.lower()
  152. if key not in seen_lower:
  153. seen_lower.add(key)
  154. deduped_groups.append(g)
  155. return LDAPUserInfo(
  156. username=canonical_username,
  157. email=email,
  158. display_name=display_name,
  159. groups=deduped_groups,
  160. )
  161. def authenticate_ldap_user(config: LDAPConfig, username: str, password: str) -> LDAPUserInfo | None:
  162. """Authenticate a user via LDAP bind.
  163. 1. Bind with service account to search for the user DN
  164. 2. Attempt bind with the user's DN and provided password
  165. 3. On success, retrieve user attributes and group memberships
  166. Returns LDAPUserInfo on success, None on failure.
  167. """
  168. if not password:
  169. return None
  170. server = _create_server(config)
  171. try:
  172. service_conn = _open_service_connection(config, server)
  173. except Exception as e:
  174. logger.warning("LDAP service account bind failed: %s", e)
  175. return None
  176. try:
  177. # Search for the user
  178. search_filter = config.user_filter.replace("{username}", _ldap_escape(username))
  179. service_conn.search(
  180. search_base=config.search_base,
  181. search_filter=search_filter,
  182. search_scope=SUBTREE,
  183. attributes=["*"],
  184. )
  185. if not service_conn.entries:
  186. logger.info("LDAP user not found: %s", username)
  187. return None
  188. user_entry = service_conn.entries[0]
  189. user_dn = str(user_entry.entry_dn)
  190. # Step 2: Bind as the user to verify password
  191. try:
  192. user_conn = Connection(
  193. server,
  194. user=user_dn,
  195. password=password,
  196. auto_bind=False,
  197. raise_exceptions=True,
  198. read_only=True,
  199. )
  200. user_conn.open()
  201. if config.security == "starttls" and not config.server_url.startswith("ldaps://"):
  202. user_conn.start_tls()
  203. user_conn.bind()
  204. user_conn.unbind()
  205. except Exception as e:
  206. logger.info("LDAP bind failed for user %s: %s", username, e)
  207. return None
  208. info = _extract_user_info(service_conn, config, user_entry, username)
  209. logger.info(
  210. "LDAP authentication successful for user: %s (DN: %s, groups: %d)",
  211. info.username,
  212. user_dn,
  213. len(info.groups),
  214. )
  215. return info
  216. finally:
  217. service_conn.unbind()
  218. def lookup_ldap_user(config: LDAPConfig, username: str) -> LDAPUserInfo | None:
  219. """Look up a directory user by exact username via the service-account bind.
  220. Performs no password verification — intended for the admin manual-provision
  221. flow, where the caller has already been authenticated as a BamBuddy admin
  222. and now needs the directory attributes (email, display name, group DNs)
  223. to create the user.
  224. Uses the same `user_filter` template that the login path uses, so anything
  225. that logs in successfully via auto-provision is also resolvable here.
  226. """
  227. server = _create_server(config)
  228. try:
  229. service_conn = _open_service_connection(config, server)
  230. except Exception as e:
  231. logger.warning("LDAP service account bind failed during lookup: %s", e)
  232. raise
  233. try:
  234. search_filter = config.user_filter.replace("{username}", _ldap_escape(username))
  235. service_conn.search(
  236. search_base=config.search_base,
  237. search_filter=search_filter,
  238. search_scope=SUBTREE,
  239. attributes=["*"],
  240. )
  241. if not service_conn.entries:
  242. logger.info("LDAP lookup: user not found: %s", username)
  243. return None
  244. return _extract_user_info(service_conn, config, service_conn.entries[0], username)
  245. finally:
  246. service_conn.unbind()
  247. def search_ldap_users(config: LDAPConfig, query: str, limit: int = 25) -> list[LDAPSearchResult]:
  248. """Fuzzy search the directory for users matching `query`.
  249. Uses a fixed OR filter across sAMAccountName, uid, mail, displayName, and
  250. cn — covering both Active Directory and OpenLDAP layouts. The query is
  251. RFC-4515 escaped so a typed `*` doesn't enumerate the whole directory.
  252. Returns up to `limit` results (default 25). Service-bind failures raise so
  253. the caller can surface a 503; "no matches" returns an empty list.
  254. Callers should enforce a minimum query length (≥2 chars) — short queries
  255. against a large directory are wasteful and effectively unbounded.
  256. """
  257. query = query.strip()
  258. if len(query) < 2:
  259. return []
  260. escaped = _ldap_escape(query)
  261. search_filter = (
  262. f"(|(sAMAccountName=*{escaped}*)(uid=*{escaped}*)(mail=*{escaped}*)(displayName=*{escaped}*)(cn=*{escaped}*))"
  263. )
  264. server = _create_server(config)
  265. try:
  266. # check_names=False so OpenLDAP directories (no sAMAccountName/displayName
  267. # in schema) don't reject the cross-schema OR filter — see helper docstring.
  268. service_conn = _open_service_connection(config, server, check_names=False)
  269. except Exception as e:
  270. logger.warning("LDAP service account bind failed during search: %s", e)
  271. raise
  272. try:
  273. # attributes=["*"] requests all user attributes. We can't enumerate the
  274. # AD/OpenLDAP-specific names (sAMAccountName, displayName) explicitly
  275. # because ldap3 validates the attribute list against the server schema
  276. # even with check_names=False — and OpenLDAP rejects the AD names. The
  277. # `*` wildcard is hardcoded in ldap3's ATTRIBUTES_EXCLUDED_FROM_CHECK so
  278. # it bypasses that validation, and the server returns whatever it has.
  279. service_conn.search(
  280. search_base=config.search_base,
  281. search_filter=search_filter,
  282. search_scope=SUBTREE,
  283. attributes=["*"],
  284. size_limit=limit,
  285. )
  286. results: list[LDAPSearchResult] = []
  287. for entry in service_conn.entries:
  288. username = _pick_canonical_username(entry, "")
  289. if not username and hasattr(entry, "cn") and entry.cn:
  290. # Last resort — some OpenLDAP layouts only have cn
  291. username = str(entry.cn)
  292. if not username:
  293. continue
  294. email = str(entry.mail) if hasattr(entry, "mail") and entry.mail else None
  295. display_name = str(entry.displayName) if hasattr(entry, "displayName") and entry.displayName else None
  296. results.append(
  297. LDAPSearchResult(
  298. username=username,
  299. email=email,
  300. display_name=display_name,
  301. dn=str(entry.entry_dn),
  302. )
  303. )
  304. logger.info("LDAP directory search for %r returned %d result(s)", query, len(results))
  305. return results
  306. finally:
  307. service_conn.unbind()
  308. def resolve_group_mapping(ldap_groups: list[str], group_mapping: dict[str, str]) -> list[str]:
  309. """Map LDAP group DNs to BamBuddy group names.
  310. Returns list of BamBuddy group names that the user should be added to.
  311. Comparison is case-insensitive on the LDAP group DN.
  312. """
  313. if not group_mapping:
  314. return []
  315. # Build case-insensitive lookup
  316. mapping_lower = {k.lower(): v for k, v in group_mapping.items()}
  317. result = []
  318. for ldap_group in ldap_groups:
  319. bambuddy_group = mapping_lower.get(ldap_group.lower())
  320. if bambuddy_group:
  321. result.append(bambuddy_group)
  322. return result
  323. def test_ldap_connection(config: LDAPConfig) -> tuple[bool, str]:
  324. """Test LDAP connection and service account bind.
  325. Returns (success, message).
  326. """
  327. try:
  328. server = _create_server(config)
  329. conn = Connection(
  330. server,
  331. user=config.bind_dn,
  332. password=config.bind_password,
  333. auto_bind=False,
  334. raise_exceptions=True,
  335. read_only=True,
  336. )
  337. conn.open()
  338. if config.security == "starttls" and not config.server_url.startswith("ldaps://"):
  339. conn.start_tls()
  340. conn.bind()
  341. # Try a search to verify search base
  342. conn.search(
  343. search_base=config.search_base,
  344. search_filter="(objectClass=*)",
  345. search_scope=SUBTREE,
  346. size_limit=1,
  347. )
  348. conn.unbind()
  349. return True, "LDAP connection successful"
  350. except Exception as e:
  351. return False, f"LDAP connection failed: {e}"
  352. def _ldap_escape(value: str) -> str:
  353. """Escape special characters in LDAP search filter values (RFC 4515)."""
  354. replacements = {
  355. "\\": "\\5c",
  356. "*": "\\2a",
  357. "(": "\\28",
  358. ")": "\\29",
  359. "\x00": "\\00",
  360. }
  361. for char, escaped in replacements.items():
  362. value = value.replace(char, escaped)
  363. return value