test_ldap_provision.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. """Integration tests for the manual LDAP user provisioning routes (#1298).
  2. Reporter @Fuechslein noted that BamBuddy forced admins to leave auto-provision
  3. on because there was no UI path to create an LDAP user by hand. The new
  4. endpoints are GET /auth/ldap/search (admin types a partial name, picks a
  5. candidate) and POST /auth/ldap/provision (server re-resolves and creates the
  6. user).
  7. These tests cover:
  8. - Permission gating (only USERS_CREATE can search/provision)
  9. - LDAP-disabled and short-query rejections
  10. - Service-unreachable surfaces as 503, not 200 empty
  11. - Provision creates the user with auth_source=ldap, password_hash=None
  12. - Provision applies the same group mapping as the auto-provision login path
  13. - Duplicate-username protection (409 with explanation)
  14. """
  15. from unittest.mock import patch
  16. import pytest
  17. from httpx import AsyncClient
  18. from sqlalchemy.ext.asyncio import AsyncSession
  19. from backend.app.models.settings import Settings
  20. from backend.app.models.user import User
  21. from backend.app.services.ldap_service import LDAPSearchResult, LDAPUserInfo
  22. # ---------------------------------------------------------------------------
  23. # Fixtures
  24. # ---------------------------------------------------------------------------
  25. async def _seed_ldap_settings(db: AsyncSession, **overrides) -> None:
  26. """Write a minimal but valid LDAP config to the settings table."""
  27. defaults = {
  28. "ldap_enabled": "true",
  29. "ldap_server_url": "ldaps://ldap.test.example:636",
  30. "ldap_bind_dn": "cn=admin,dc=test,dc=com",
  31. "ldap_bind_password": "x", # pragma: allowlist secret — test fixture
  32. "ldap_search_base": "dc=test,dc=com",
  33. "ldap_user_filter": "(uid={username})",
  34. "ldap_security": "ldaps",
  35. "ldap_group_mapping": "{}",
  36. "ldap_auto_provision": "false",
  37. "ldap_ca_cert_path": "",
  38. "ldap_default_group": "",
  39. }
  40. defaults.update(overrides)
  41. for key, value in defaults.items():
  42. db.add(Settings(key=key, value=value))
  43. await db.commit()
  44. @pytest.fixture
  45. async def admin_token(async_client: AsyncClient) -> str:
  46. """Enable auth, create an admin, return a valid bearer token."""
  47. await async_client.post(
  48. "/api/v1/auth/setup",
  49. json={
  50. "auth_enabled": True,
  51. "admin_username": "ldapadmin",
  52. "admin_password": "AdminPass1!",
  53. },
  54. )
  55. login = await async_client.post(
  56. "/api/v1/auth/login",
  57. json={"username": "ldapadmin", "password": "AdminPass1!"},
  58. )
  59. return login.json()["access_token"]
  60. # ---------------------------------------------------------------------------
  61. # /auth/ldap/search
  62. # ---------------------------------------------------------------------------
  63. class TestLdapSearchRoute:
  64. @pytest.mark.asyncio
  65. @pytest.mark.integration
  66. async def test_requires_auth(self, async_client: AsyncClient, db_session: AsyncSession):
  67. """Anonymous access is rejected when auth is enabled."""
  68. await async_client.post(
  69. "/api/v1/auth/setup",
  70. json={"auth_enabled": True, "admin_username": "x", "admin_password": "AdminPass1!"},
  71. )
  72. response = await async_client.get("/api/v1/auth/ldap/search?q=jdoe")
  73. assert response.status_code == 401
  74. @pytest.mark.asyncio
  75. @pytest.mark.integration
  76. async def test_rejects_short_query(self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession):
  77. """Single-char queries would be effectively unbounded against a large directory."""
  78. await _seed_ldap_settings(db_session)
  79. response = await async_client.get(
  80. "/api/v1/auth/ldap/search?q=j",
  81. headers={"Authorization": f"Bearer {admin_token}"},
  82. )
  83. assert response.status_code == 400
  84. assert "at least 2 characters" in response.json()["detail"]
  85. @pytest.mark.asyncio
  86. @pytest.mark.integration
  87. async def test_rejects_when_ldap_disabled(
  88. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  89. ):
  90. """No LDAP config in settings → 400 with a clear message."""
  91. response = await async_client.get(
  92. "/api/v1/auth/ldap/search?q=jdoe",
  93. headers={"Authorization": f"Bearer {admin_token}"},
  94. )
  95. assert response.status_code == 400
  96. assert "LDAP is not enabled" in response.json()["detail"]
  97. @pytest.mark.asyncio
  98. @pytest.mark.integration
  99. async def test_surfaces_unreachable_as_503(
  100. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  101. ):
  102. """When the underlying search fails (network/auth), the admin gets 503 — not
  103. a silent empty list (which would look like 'no matches')."""
  104. await _seed_ldap_settings(db_session)
  105. with patch(
  106. "backend.app.services.ldap_service.search_ldap_users",
  107. side_effect=RuntimeError("simulated outage"),
  108. ):
  109. response = await async_client.get(
  110. "/api/v1/auth/ldap/search?q=jdoe",
  111. headers={"Authorization": f"Bearer {admin_token}"},
  112. )
  113. assert response.status_code == 503
  114. # Detail now includes the underlying exception class + message so the
  115. # admin can see why (e.g. "LDAP search failed: RuntimeError: simulated outage").
  116. detail = response.json()["detail"].lower()
  117. assert "ldap search failed" in detail
  118. assert "simulated outage" in detail
  119. @pytest.mark.asyncio
  120. @pytest.mark.integration
  121. async def test_returns_results_annotated_with_already_provisioned(
  122. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  123. ):
  124. """Results that match an existing local row must come back with the flag set."""
  125. await _seed_ldap_settings(db_session)
  126. # Seed an existing local user that shares a username with one LDAP result.
  127. db_session.add(User(username="existing", email="x@test.com", password_hash="$x$", role="user"))
  128. await db_session.commit()
  129. fake_results = [
  130. LDAPSearchResult(
  131. username="jdoe",
  132. email="jdoe@test.com",
  133. display_name="John Doe",
  134. dn="cn=John Doe,dc=test,dc=com",
  135. ),
  136. LDAPSearchResult(
  137. username="existing",
  138. email="existing@test.com",
  139. display_name="Already Provisioned",
  140. dn="cn=existing,dc=test,dc=com",
  141. ),
  142. ]
  143. with patch(
  144. "backend.app.services.ldap_service.search_ldap_users",
  145. return_value=fake_results,
  146. ):
  147. response = await async_client.get(
  148. "/api/v1/auth/ldap/search?q=jdoe",
  149. headers={"Authorization": f"Bearer {admin_token}"},
  150. )
  151. assert response.status_code == 200
  152. body = response.json()
  153. assert len(body) == 2
  154. by_user = {r["username"]: r for r in body}
  155. assert by_user["jdoe"]["already_provisioned"] is False
  156. assert by_user["existing"]["already_provisioned"] is True
  157. # ---------------------------------------------------------------------------
  158. # /auth/ldap/provision
  159. # ---------------------------------------------------------------------------
  160. class TestLdapProvisionRoute:
  161. @pytest.mark.asyncio
  162. @pytest.mark.integration
  163. async def test_requires_auth(self, async_client: AsyncClient):
  164. await async_client.post(
  165. "/api/v1/auth/setup",
  166. json={"auth_enabled": True, "admin_username": "x", "admin_password": "AdminPass1!"},
  167. )
  168. response = await async_client.post(
  169. "/api/v1/auth/ldap/provision",
  170. json={"username": "jdoe"},
  171. )
  172. assert response.status_code == 401
  173. @pytest.mark.asyncio
  174. @pytest.mark.integration
  175. async def test_404_when_directory_lookup_misses(
  176. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  177. ):
  178. await _seed_ldap_settings(db_session)
  179. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=None):
  180. response = await async_client.post(
  181. "/api/v1/auth/ldap/provision",
  182. json={"username": "nobody"},
  183. headers={"Authorization": f"Bearer {admin_token}"},
  184. )
  185. assert response.status_code == 404
  186. assert "not found in LDAP directory" in response.json()["detail"]
  187. @pytest.mark.asyncio
  188. @pytest.mark.integration
  189. async def test_409_when_local_user_exists(
  190. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  191. ):
  192. """A local user with the same username must block provision — the admin has
  193. to resolve the collision manually rather than silently coexisting."""
  194. await _seed_ldap_settings(db_session)
  195. db_session.add(User(username="jdoe", password_hash="$x$", role="user", auth_source="local"))
  196. await db_session.commit()
  197. fake_ldap = LDAPUserInfo(username="jdoe", email="jdoe@test.com", display_name=None, groups=[])
  198. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=fake_ldap):
  199. response = await async_client.post(
  200. "/api/v1/auth/ldap/provision",
  201. json={"username": "jdoe"},
  202. headers={"Authorization": f"Bearer {admin_token}"},
  203. )
  204. assert response.status_code == 409
  205. assert "local user" in response.json()["detail"].lower()
  206. @pytest.mark.asyncio
  207. @pytest.mark.integration
  208. async def test_409_when_already_provisioned(
  209. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  210. ):
  211. """Re-provisioning an existing LDAP user must give a distinct error so the
  212. UI can suggest 'they exist already, just have them log in' rather than
  213. the more alarming 'local conflict' message."""
  214. await _seed_ldap_settings(db_session)
  215. db_session.add(User(username="alice", password_hash=None, role="user", auth_source="ldap"))
  216. await db_session.commit()
  217. fake_ldap = LDAPUserInfo(username="alice", email="alice@test.com", display_name=None, groups=[])
  218. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=fake_ldap):
  219. response = await async_client.post(
  220. "/api/v1/auth/ldap/provision",
  221. json={"username": "alice"},
  222. headers={"Authorization": f"Bearer {admin_token}"},
  223. )
  224. assert response.status_code == 409
  225. assert "already provisioned" in response.json()["detail"].lower()
  226. @pytest.mark.asyncio
  227. @pytest.mark.integration
  228. async def test_503_when_directory_unreachable(
  229. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  230. ):
  231. await _seed_ldap_settings(db_session)
  232. with patch(
  233. "backend.app.services.ldap_service.lookup_ldap_user",
  234. side_effect=RuntimeError("simulated outage"),
  235. ):
  236. response = await async_client.post(
  237. "/api/v1/auth/ldap/provision",
  238. json={"username": "jdoe"},
  239. headers={"Authorization": f"Bearer {admin_token}"},
  240. )
  241. assert response.status_code == 503
  242. @pytest.mark.asyncio
  243. @pytest.mark.integration
  244. async def test_happy_path_creates_user_with_ldap_auth_source(
  245. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  246. ):
  247. """Verifies the full provision: response shape + DB state."""
  248. await _seed_ldap_settings(db_session)
  249. fake_ldap = LDAPUserInfo(
  250. username="newuser",
  251. email="newuser@test.com",
  252. display_name="New User",
  253. groups=[],
  254. )
  255. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=fake_ldap):
  256. response = await async_client.post(
  257. "/api/v1/auth/ldap/provision",
  258. json={"username": "newuser"},
  259. headers={"Authorization": f"Bearer {admin_token}"},
  260. )
  261. assert response.status_code == 201
  262. body = response.json()
  263. assert body["username"] == "newuser"
  264. assert body["email"] == "newuser@test.com"
  265. assert body["auth_source"] == "ldap"
  266. # Verify DB state: password_hash MUST be None (LDAP has no local credential)
  267. from sqlalchemy import select
  268. row = (await db_session.execute(select(User).where(User.username == "newuser"))).scalar_one()
  269. assert row.auth_source == "ldap"
  270. assert row.password_hash is None
  271. @pytest.mark.asyncio
  272. @pytest.mark.integration
  273. async def test_happy_path_applies_group_mapping(
  274. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  275. ):
  276. """Provision must run the same group-mapping logic as the auto-provision
  277. login path — so an admin who provisions Alice gets the exact same group
  278. memberships as if Alice had logged in herself with auto-provision on."""
  279. await _seed_ldap_settings(
  280. db_session,
  281. ldap_group_mapping='{"cn=staff,ou=groups,dc=test,dc=com": "Operators"}',
  282. )
  283. # Operators group is auto-seeded by the test harness — no need to create it.
  284. fake_ldap = LDAPUserInfo(
  285. username="alice",
  286. email="alice@test.com",
  287. display_name="Alice",
  288. groups=["cn=staff,ou=groups,dc=test,dc=com"],
  289. )
  290. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=fake_ldap):
  291. response = await async_client.post(
  292. "/api/v1/auth/ldap/provision",
  293. json={"username": "alice"},
  294. headers={"Authorization": f"Bearer {admin_token}"},
  295. )
  296. assert response.status_code == 201
  297. body = response.json()
  298. group_names = {g["name"] for g in body["groups"]}
  299. assert "Operators" in group_names