ldap_service.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. """LDAP authentication service for BamBuddy (#794).
  2. Supports:
  3. - LDAP bind authentication (simple bind with user's credentials)
  4. - StartTLS, LDAPS, and plaintext connections
  5. - User search with configurable filter
  6. - Group membership resolution for role mapping
  7. """
  8. from __future__ import annotations
  9. import json
  10. import logging
  11. from dataclasses import dataclass
  12. from ldap3 import ALL, SUBTREE, Connection, Server, Tls
  13. logger = logging.getLogger(__name__)
  14. @dataclass
  15. class LDAPUserInfo:
  16. """User information retrieved from LDAP after successful authentication."""
  17. username: str
  18. email: str | None
  19. display_name: str | None
  20. groups: list[str] # List of group DNs the user belongs to
  21. @dataclass
  22. class LDAPConfig:
  23. """LDAP configuration parsed from settings."""
  24. server_url: str
  25. bind_dn: str
  26. bind_password: str
  27. search_base: str
  28. user_filter: str # e.g. "(sAMAccountName={username})"
  29. security: str # "none", "starttls", "ldaps"
  30. group_mapping: dict[str, str] # LDAP group DN -> BamBuddy group name
  31. auto_provision: bool
  32. ca_cert_path: str # Path to CA certificate file (empty = skip verification)
  33. def parse_ldap_config(settings: dict[str, str]) -> LDAPConfig | None:
  34. """Parse LDAP config from settings key-value pairs. Returns None if LDAP not enabled."""
  35. if settings.get("ldap_enabled", "false").lower() != "true":
  36. return None
  37. server_url = settings.get("ldap_server_url", "").strip()
  38. if not server_url:
  39. return None
  40. group_mapping_raw = settings.get("ldap_group_mapping", "")
  41. try:
  42. group_mapping = json.loads(group_mapping_raw) if group_mapping_raw else {}
  43. except json.JSONDecodeError:
  44. group_mapping = {}
  45. return LDAPConfig(
  46. server_url=server_url,
  47. bind_dn=settings.get("ldap_bind_dn", "").strip(),
  48. bind_password=settings.get("ldap_bind_password", ""),
  49. search_base=settings.get("ldap_search_base", "").strip(),
  50. user_filter=settings.get("ldap_user_filter", "(sAMAccountName={username})").strip(),
  51. security=settings.get("ldap_security", "starttls").strip(),
  52. group_mapping=group_mapping if isinstance(group_mapping, dict) else {},
  53. auto_provision=settings.get("ldap_auto_provision", "false").lower() == "true",
  54. ca_cert_path=settings.get("ldap_ca_cert_path", "").strip(),
  55. )
  56. def _create_server(config: LDAPConfig) -> Server:
  57. """Create an ldap3 Server instance from config.
  58. Always uses TLS — either LDAPS (TLS from start) or StartTLS (upgrade after connect).
  59. Plaintext LDAP is not supported.
  60. """
  61. import ssl
  62. use_ssl = config.security == "ldaps" or config.server_url.startswith("ldaps://")
  63. if config.ca_cert_path:
  64. tls = Tls(validate=ssl.CERT_REQUIRED, ca_certs_file=config.ca_cert_path)
  65. else:
  66. tls = Tls(validate=ssl.CERT_NONE)
  67. return Server(config.server_url, use_ssl=use_ssl, tls=tls, get_info=ALL, connect_timeout=10)
  68. def authenticate_ldap_user(config: LDAPConfig, username: str, password: str) -> LDAPUserInfo | None:
  69. """Authenticate a user via LDAP bind.
  70. 1. Bind with service account to search for the user DN
  71. 2. Attempt bind with the user's DN and provided password
  72. 3. On success, retrieve user attributes and group memberships
  73. Returns LDAPUserInfo on success, None on failure.
  74. """
  75. if not password:
  76. return None
  77. server = _create_server(config)
  78. # Step 1: Service account bind + user search
  79. try:
  80. service_conn = Connection(
  81. server,
  82. user=config.bind_dn,
  83. password=config.bind_password,
  84. auto_bind=False,
  85. raise_exceptions=True,
  86. read_only=True,
  87. )
  88. service_conn.open()
  89. if config.security == "starttls" and not config.server_url.startswith("ldaps://"):
  90. service_conn.start_tls()
  91. service_conn.bind()
  92. except Exception as e:
  93. logger.warning("LDAP service account bind failed: %s", e)
  94. return None
  95. try:
  96. # Search for the user
  97. search_filter = config.user_filter.replace("{username}", _ldap_escape(username))
  98. service_conn.search(
  99. search_base=config.search_base,
  100. search_filter=search_filter,
  101. search_scope=SUBTREE,
  102. attributes=["*"],
  103. )
  104. if not service_conn.entries:
  105. logger.info("LDAP user not found: %s", username)
  106. return None
  107. user_entry = service_conn.entries[0]
  108. user_dn = str(user_entry.entry_dn)
  109. # Step 2: Bind as the user to verify password
  110. try:
  111. user_conn = Connection(
  112. server,
  113. user=user_dn,
  114. password=password,
  115. auto_bind=False,
  116. raise_exceptions=True,
  117. read_only=True,
  118. )
  119. user_conn.open()
  120. if config.security == "starttls" and not config.server_url.startswith("ldaps://"):
  121. user_conn.start_tls()
  122. user_conn.bind()
  123. user_conn.unbind()
  124. except Exception as e:
  125. logger.info("LDAP bind failed for user %s: %s", username, e)
  126. return None
  127. # Step 3: Extract user info
  128. email = str(user_entry.mail) if hasattr(user_entry, "mail") and user_entry.mail else None
  129. display_name = (
  130. str(user_entry.displayName) if hasattr(user_entry, "displayName") and user_entry.displayName else None
  131. )
  132. # Collect groups from memberOf attribute (Active Directory / groupOfNames)
  133. groups = (
  134. [str(g) for g in user_entry.memberOf] if hasattr(user_entry, "memberOf") and user_entry.memberOf else []
  135. )
  136. # Also search for Posix groups (memberUid-based) using the service account
  137. canonical_username = username
  138. if hasattr(user_entry, "sAMAccountName") and user_entry.sAMAccountName:
  139. canonical_username = str(user_entry.sAMAccountName)
  140. elif hasattr(user_entry, "uid") and user_entry.uid:
  141. canonical_username = str(user_entry.uid)
  142. posix_filter = f"(&(objectClass=posixGroup)(memberUid={_ldap_escape(canonical_username)}))"
  143. service_conn.search(
  144. search_base=config.search_base,
  145. search_filter=posix_filter,
  146. search_scope=SUBTREE,
  147. attributes=["cn"],
  148. )
  149. for entry in service_conn.entries:
  150. groups.append(str(entry.entry_dn))
  151. logger.info(
  152. "LDAP authentication successful for user: %s (DN: %s, groups: %d)", canonical_username, user_dn, len(groups)
  153. )
  154. return LDAPUserInfo(
  155. username=canonical_username,
  156. email=email,
  157. display_name=display_name,
  158. groups=groups,
  159. )
  160. finally:
  161. service_conn.unbind()
  162. def resolve_group_mapping(ldap_groups: list[str], group_mapping: dict[str, str]) -> list[str]:
  163. """Map LDAP group DNs to BamBuddy group names.
  164. Returns list of BamBuddy group names that the user should be added to.
  165. Comparison is case-insensitive on the LDAP group DN.
  166. """
  167. if not group_mapping:
  168. return []
  169. # Build case-insensitive lookup
  170. mapping_lower = {k.lower(): v for k, v in group_mapping.items()}
  171. result = []
  172. for ldap_group in ldap_groups:
  173. bambuddy_group = mapping_lower.get(ldap_group.lower())
  174. if bambuddy_group:
  175. result.append(bambuddy_group)
  176. return result
  177. def test_ldap_connection(config: LDAPConfig) -> tuple[bool, str]:
  178. """Test LDAP connection and service account bind.
  179. Returns (success, message).
  180. """
  181. try:
  182. server = _create_server(config)
  183. conn = Connection(
  184. server,
  185. user=config.bind_dn,
  186. password=config.bind_password,
  187. auto_bind=False,
  188. raise_exceptions=True,
  189. read_only=True,
  190. )
  191. conn.open()
  192. if config.security == "starttls" and not config.server_url.startswith("ldaps://"):
  193. conn.start_tls()
  194. conn.bind()
  195. # Try a search to verify search base
  196. conn.search(
  197. search_base=config.search_base,
  198. search_filter="(objectClass=*)",
  199. search_scope=SUBTREE,
  200. size_limit=1,
  201. )
  202. conn.unbind()
  203. return True, "LDAP connection successful"
  204. except Exception as e:
  205. return False, f"LDAP connection failed: {e}"
  206. def _ldap_escape(value: str) -> str:
  207. """Escape special characters in LDAP search filter values (RFC 4515)."""
  208. replacements = {
  209. "\\": "\\5c",
  210. "*": "\\2a",
  211. "(": "\\28",
  212. ")": "\\29",
  213. "\x00": "\\00",
  214. }
  215. for char, escaped in replacements.items():
  216. value = value.replace(char, escaped)
  217. return value