test_ldap_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. """Tests for LDAP authentication service (#794).
  2. Tests the pure logic functions in ldap_service.py:
  3. - Config parsing from settings dict
  4. - LDAP filter escaping (RFC 4515)
  5. - Group mapping resolution
  6. - LDAPConfig/LDAPUserInfo dataclass construction
  7. Network-dependent functions (authenticate_ldap_user, test_ldap_connection)
  8. are not tested here — they require a live LDAP server.
  9. """
  10. import pytest
  11. from backend.app.services.ldap_service import (
  12. LDAPConfig,
  13. LDAPUserInfo,
  14. _ldap_escape,
  15. authenticate_ldap_user,
  16. parse_ldap_config,
  17. resolve_group_mapping,
  18. )
  19. class TestParseConfig:
  20. """Verify parse_ldap_config builds LDAPConfig from settings dict."""
  21. def test_returns_none_when_disabled(self):
  22. settings = {"ldap_enabled": "false", "ldap_server_url": "ldaps://example.com"}
  23. assert parse_ldap_config(settings) is None
  24. def test_returns_none_when_missing_enabled(self):
  25. settings = {"ldap_server_url": "ldaps://example.com"}
  26. assert parse_ldap_config(settings) is None
  27. def test_returns_none_when_no_server_url(self):
  28. settings = {"ldap_enabled": "true", "ldap_server_url": ""}
  29. assert parse_ldap_config(settings) is None
  30. def test_returns_none_when_server_url_whitespace(self):
  31. settings = {"ldap_enabled": "true", "ldap_server_url": " "}
  32. assert parse_ldap_config(settings) is None
  33. def test_parses_minimal_config(self):
  34. settings = {
  35. "ldap_enabled": "true",
  36. "ldap_server_url": "ldaps://ldap.example.com:636",
  37. }
  38. config = parse_ldap_config(settings)
  39. assert config is not None
  40. assert config.server_url == "ldaps://ldap.example.com:636"
  41. assert config.bind_dn == ""
  42. assert config.search_base == ""
  43. assert config.user_filter == "(sAMAccountName={username})"
  44. assert config.security == "starttls"
  45. assert config.group_mapping == {}
  46. assert config.auto_provision is False
  47. assert config.ca_cert_path == ""
  48. assert config.default_group == ""
  49. def test_parses_full_config(self):
  50. settings = {
  51. "ldap_enabled": "true",
  52. "ldap_server_url": "ldaps://ldap.example.com:636",
  53. "ldap_bind_dn": "cn=admin,dc=example,dc=com",
  54. "ldap_bind_password": "secret",
  55. "ldap_search_base": "ou=users,dc=example,dc=com",
  56. "ldap_user_filter": "(uid={username})",
  57. "ldap_security": "ldaps",
  58. "ldap_group_mapping": '{"cn=admins,dc=example,dc=com": "Administrators"}',
  59. "ldap_auto_provision": "true",
  60. "ldap_ca_cert_path": "/path/to/ca.pem",
  61. "ldap_default_group": "Viewers",
  62. }
  63. config = parse_ldap_config(settings)
  64. assert config is not None
  65. assert config.bind_dn == "cn=admin,dc=example,dc=com"
  66. assert config.bind_password == "secret"
  67. assert config.search_base == "ou=users,dc=example,dc=com"
  68. assert config.user_filter == "(uid={username})"
  69. assert config.security == "ldaps"
  70. assert config.group_mapping == {"cn=admins,dc=example,dc=com": "Administrators"}
  71. assert config.auto_provision is True
  72. assert config.ca_cert_path == "/path/to/ca.pem"
  73. assert config.default_group == "Viewers"
  74. def test_handles_invalid_group_mapping_json(self):
  75. settings = {
  76. "ldap_enabled": "true",
  77. "ldap_server_url": "ldaps://ldap.example.com",
  78. "ldap_group_mapping": "not valid json",
  79. }
  80. config = parse_ldap_config(settings)
  81. assert config is not None
  82. assert config.group_mapping == {}
  83. def test_handles_non_dict_group_mapping(self):
  84. settings = {
  85. "ldap_enabled": "true",
  86. "ldap_server_url": "ldaps://ldap.example.com",
  87. "ldap_group_mapping": '["not", "a", "dict"]',
  88. }
  89. config = parse_ldap_config(settings)
  90. assert config is not None
  91. assert config.group_mapping == {}
  92. def test_enabled_case_insensitive(self):
  93. settings = {"ldap_enabled": "True", "ldap_server_url": "ldaps://ldap.example.com"}
  94. assert parse_ldap_config(settings) is not None
  95. settings = {"ldap_enabled": "TRUE", "ldap_server_url": "ldaps://ldap.example.com"}
  96. assert parse_ldap_config(settings) is not None
  97. def test_strips_whitespace(self):
  98. settings = {
  99. "ldap_enabled": "true",
  100. "ldap_server_url": " ldaps://ldap.example.com ",
  101. "ldap_bind_dn": " cn=admin,dc=example,dc=com ",
  102. "ldap_search_base": " dc=example,dc=com ",
  103. "ldap_default_group": " Viewers ",
  104. }
  105. config = parse_ldap_config(settings)
  106. assert config.server_url == "ldaps://ldap.example.com"
  107. assert config.bind_dn == "cn=admin,dc=example,dc=com"
  108. assert config.search_base == "dc=example,dc=com"
  109. assert config.default_group == "Viewers"
  110. class TestLDAPEscape:
  111. """Verify RFC 4515 escaping for LDAP search filter values."""
  112. def test_plain_string(self):
  113. assert _ldap_escape("testuser") == "testuser"
  114. def test_escapes_backslash(self):
  115. assert _ldap_escape("test\\user") == "test\\5cuser"
  116. def test_escapes_asterisk(self):
  117. assert _ldap_escape("test*user") == "test\\2auser"
  118. def test_escapes_open_paren(self):
  119. assert _ldap_escape("test(user") == "test\\28user"
  120. def test_escapes_close_paren(self):
  121. assert _ldap_escape("test)user") == "test\\29user"
  122. def test_escapes_null(self):
  123. assert _ldap_escape("test\x00user") == "test\\00user"
  124. def test_escapes_multiple_chars(self):
  125. assert _ldap_escape("a*b(c)d\\e") == "a\\2ab\\28c\\29d\\5ce"
  126. def test_empty_string(self):
  127. assert _ldap_escape("") == ""
  128. class TestResolveGroupMapping:
  129. """Verify LDAP group DN to BamBuddy group name resolution."""
  130. def test_empty_mapping(self):
  131. assert resolve_group_mapping(["cn=admins,dc=example"], {}) == []
  132. def test_empty_groups(self):
  133. mapping = {"cn=admins,dc=example": "Administrators"}
  134. assert resolve_group_mapping([], mapping) == []
  135. def test_single_match(self):
  136. mapping = {"cn=admins,dc=example,dc=com": "Administrators"}
  137. groups = ["cn=admins,dc=example,dc=com"]
  138. assert resolve_group_mapping(groups, mapping) == ["Administrators"]
  139. def test_multiple_matches(self):
  140. mapping = {
  141. "cn=admins,dc=example,dc=com": "Administrators",
  142. "cn=ops,dc=example,dc=com": "Operators",
  143. }
  144. groups = ["cn=admins,dc=example,dc=com", "cn=ops,dc=example,dc=com"]
  145. result = resolve_group_mapping(groups, mapping)
  146. assert set(result) == {"Administrators", "Operators"}
  147. def test_no_match(self):
  148. mapping = {"cn=admins,dc=example,dc=com": "Administrators"}
  149. groups = ["cn=users,dc=example,dc=com"]
  150. assert resolve_group_mapping(groups, mapping) == []
  151. def test_case_insensitive_dn(self):
  152. mapping = {"CN=Admins,DC=Example,DC=Com": "Administrators"}
  153. groups = ["cn=admins,dc=example,dc=com"]
  154. assert resolve_group_mapping(groups, mapping) == ["Administrators"]
  155. def test_partial_match_not_matched(self):
  156. mapping = {"cn=admins,dc=example,dc=com": "Administrators"}
  157. groups = ["cn=admins,dc=other,dc=com"]
  158. assert resolve_group_mapping(groups, mapping) == []
  159. def test_extra_groups_ignored(self):
  160. mapping = {"cn=admins,dc=example,dc=com": "Administrators"}
  161. groups = ["cn=admins,dc=example,dc=com", "cn=users,dc=example,dc=com", "cn=devs,dc=example,dc=com"]
  162. assert resolve_group_mapping(groups, mapping) == ["Administrators"]
  163. class TestDataclasses:
  164. """Verify dataclass construction."""
  165. def test_ldap_user_info(self):
  166. info = LDAPUserInfo(
  167. username="testuser",
  168. email="test@example.com",
  169. display_name="Test User",
  170. groups=["cn=admins,dc=example,dc=com"],
  171. )
  172. assert info.username == "testuser"
  173. assert info.email == "test@example.com"
  174. assert info.display_name == "Test User"
  175. assert info.groups == ["cn=admins,dc=example,dc=com"]
  176. def test_ldap_user_info_none_fields(self):
  177. info = LDAPUserInfo(username="testuser", email=None, display_name=None, groups=[])
  178. assert info.email is None
  179. assert info.display_name is None
  180. assert info.groups == []
  181. def test_ldap_config(self):
  182. config = LDAPConfig(
  183. server_url="ldaps://ldap.example.com:636",
  184. bind_dn="cn=admin,dc=example,dc=com",
  185. bind_password="secret",
  186. search_base="dc=example,dc=com",
  187. user_filter="(uid={username})",
  188. security="ldaps",
  189. group_mapping={"cn=admins": "Administrators"},
  190. auto_provision=True,
  191. ca_cert_path="",
  192. default_group="Viewers",
  193. )
  194. assert config.server_url == "ldaps://ldap.example.com:636"
  195. assert config.auto_provision is True
  196. assert config.default_group == "Viewers"
  197. # ---------------------------------------------------------------------------
  198. # Mocked authenticate_ldap_user group-discovery tests
  199. # ---------------------------------------------------------------------------
  200. # These tests mock ldap3.Connection to exercise the group-discovery logic in
  201. # authenticate_ldap_user without a live LDAP server. Added after a bug where
  202. # POSIX primary-group membership (via gidNumber) was ignored — see CHANGELOG.
  203. class _MockAttr:
  204. """Minimal stand-in for ldap3 Attribute objects.
  205. Supports str(), bool(), .value, .values, and iteration — the operations
  206. used by ldap_service against user entry attributes.
  207. """
  208. def __init__(self, value):
  209. self._value = value
  210. @property
  211. def value(self):
  212. return self._value
  213. @property
  214. def values(self):
  215. return self._value if isinstance(self._value, list) else [self._value]
  216. def __str__(self):
  217. return str(self._value)
  218. def __bool__(self):
  219. return bool(self._value)
  220. def __iter__(self):
  221. if isinstance(self._value, list):
  222. return iter(self._value)
  223. return iter([self._value])
  224. class _MockEntry:
  225. """Minimal stand-in for ldap3 Entry. Only attributes passed at construction exist."""
  226. def __init__(self, dn, **attrs):
  227. self.entry_dn = dn
  228. for key, val in attrs.items():
  229. setattr(self, key, _MockAttr(val))
  230. class _MockConnection:
  231. """Mock ldap3 Connection that returns pre-configured entries based on filter substring match.
  232. Every Connection() instance shares a class-level fixture dict so the service-account
  233. connection and the user-bind connection both see the same fake directory.
  234. """
  235. _search_fixture: dict[str, list] = {}
  236. _instances: list["_MockConnection"] = []
  237. def __init__(self, *args, **kwargs):
  238. self.entries: list = []
  239. self.search_calls: list[str] = []
  240. _MockConnection._instances.append(self)
  241. def open(self):
  242. pass
  243. def start_tls(self):
  244. pass
  245. def bind(self):
  246. return True
  247. def unbind(self):
  248. pass
  249. def search(self, search_base=None, search_filter=None, search_scope=None, attributes=None):
  250. self.search_calls.append(search_filter or "")
  251. for needle, entries in _MockConnection._search_fixture.items():
  252. if needle in (search_filter or ""):
  253. self.entries = entries
  254. return True
  255. self.entries = []
  256. return True
  257. @pytest.fixture
  258. def mock_ldap(monkeypatch):
  259. """Patch Connection + _create_server in ldap_service so authenticate_ldap_user can run offline."""
  260. _MockConnection._search_fixture = {}
  261. _MockConnection._instances = []
  262. monkeypatch.setattr("backend.app.services.ldap_service.Connection", _MockConnection)
  263. monkeypatch.setattr("backend.app.services.ldap_service._create_server", lambda config: None)
  264. return _MockConnection
  265. def _base_config(**overrides):
  266. """Build a minimal LDAPConfig for mocked tests."""
  267. defaults = {
  268. "server_url": "ldaps://test.example.com:636",
  269. "bind_dn": "cn=admin,dc=test,dc=com",
  270. "bind_password": "x",
  271. "search_base": "dc=test,dc=com",
  272. "user_filter": "(uid={username})",
  273. "security": "ldaps",
  274. "group_mapping": {},
  275. "auto_provision": False,
  276. "ca_cert_path": "",
  277. "default_group": "",
  278. }
  279. defaults.update(overrides)
  280. return LDAPConfig(**defaults)
  281. class TestAuthenticateLdapUserGroups:
  282. """Group-discovery behaviour in authenticate_ldap_user.
  283. Covers the POSIX primary gidNumber lookup and case-insensitive dedupe added
  284. to fix a bug where users whose role came from their primary group were
  285. authenticated without the correct group membership.
  286. """
  287. def test_primary_gidnumber_group_found(self, mock_ldap):
  288. """Regression: POSIX primary group (gidNumber match) must be included in the result."""
  289. user_entry = _MockEntry("cn=mz,dc=test,dc=com", uid="mz", gidNumber=10002)
  290. operators_group = _MockEntry("cn=bambuddy-operators,ou=groups,dc=test,dc=com")
  291. mock_ldap._search_fixture = {
  292. "(uid=mz)": [user_entry],
  293. "memberUid=mz": [], # no supplementary memberships
  294. "gidNumber=10002": [operators_group],
  295. }
  296. info = authenticate_ldap_user(_base_config(), "mz", "password")
  297. assert info is not None
  298. assert info.groups == ["cn=bambuddy-operators,ou=groups,dc=test,dc=com"]
  299. def test_dedupes_group_found_via_both_memberuid_and_primary_gid(self, mock_ldap):
  300. """A user in the same group via BOTH memberUid and primary gidNumber should appear once."""
  301. user_entry = _MockEntry("cn=mz,dc=test,dc=com", uid="mz", gidNumber=10002)
  302. group_entry = _MockEntry("cn=bambuddy-operators,ou=groups,dc=test,dc=com")
  303. mock_ldap._search_fixture = {
  304. "(uid=mz)": [user_entry],
  305. "memberUid=mz": [group_entry], # supplementary membership
  306. "gidNumber=10002": [group_entry], # primary group — same DN
  307. }
  308. info = authenticate_ldap_user(_base_config(), "mz", "password")
  309. assert info.groups == ["cn=bambuddy-operators,ou=groups,dc=test,dc=com"]
  310. def test_case_insensitive_dedupe(self, mock_ldap):
  311. """DNs differing only in case should collapse to a single entry (LDAP DNs are case-insensitive)."""
  312. user_entry = _MockEntry("cn=mz,dc=test,dc=com", uid="mz", gidNumber=10002)
  313. upper_dn = _MockEntry("CN=Bambuddy-Operators,OU=Groups,DC=Test,DC=Com")
  314. lower_dn = _MockEntry("cn=bambuddy-operators,ou=groups,dc=test,dc=com")
  315. mock_ldap._search_fixture = {
  316. "(uid=mz)": [user_entry],
  317. "memberUid=mz": [upper_dn],
  318. "gidNumber=10002": [lower_dn],
  319. }
  320. info = authenticate_ldap_user(_base_config(), "mz", "password")
  321. assert len(info.groups) == 1
  322. # The first-seen casing (memberUid result) is kept.
  323. assert info.groups[0] == "CN=Bambuddy-Operators,OU=Groups,DC=Test,DC=Com"
  324. def test_no_gidnumber_skips_primary_search(self, mock_ldap):
  325. """User entries without a gidNumber attribute should not crash and should not issue the primary-gid query."""
  326. user_entry = _MockEntry("cn=tester,dc=test,dc=com", uid="tester") # no gidNumber
  327. viewers_group = _MockEntry("cn=bambuddy-viewers,ou=groups,dc=test,dc=com")
  328. mock_ldap._search_fixture = {
  329. "(uid=tester)": [user_entry],
  330. "memberUid=tester": [viewers_group],
  331. }
  332. info = authenticate_ldap_user(_base_config(), "tester", "password")
  333. assert info is not None
  334. assert info.groups == ["cn=bambuddy-viewers,ou=groups,dc=test,dc=com"]
  335. # Ensure the primary-gidNumber search was never issued — verifying the guard works.
  336. service_conn = _MockConnection._instances[0]
  337. gidnumber_searches = [call for call in service_conn.search_calls if "gidNumber=" in call]
  338. assert gidnumber_searches == []