test_ldap_provision.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. """Integration tests for the manual LDAP user provisioning routes (#1298).
  2. Reporter @Fuechslein noted that BamBuddy forced admins to leave auto-provision
  3. on because there was no UI path to create an LDAP user by hand. The new
  4. endpoints are GET /auth/ldap/search (admin types a partial name, picks a
  5. candidate) and POST /auth/ldap/provision (server re-resolves and creates the
  6. user).
  7. These tests cover:
  8. - Permission gating (only USERS_CREATE can search/provision)
  9. - LDAP-disabled and short-query rejections
  10. - Service-unreachable surfaces as 503, not 200 empty
  11. - Provision creates the user with auth_source=ldap, password_hash=None
  12. - Provision applies the same group mapping as the auto-provision login path
  13. - Duplicate-username protection (409 with explanation)
  14. """
  15. from unittest.mock import patch
  16. import pytest
  17. from httpx import AsyncClient
  18. from sqlalchemy.ext.asyncio import AsyncSession
  19. from backend.app.models.settings import Settings
  20. from backend.app.models.user import User
  21. from backend.app.services.ldap_service import LDAPSearchResult, LDAPUserInfo
  22. # ---------------------------------------------------------------------------
  23. # Fixtures
  24. # ---------------------------------------------------------------------------
  25. async def _seed_ldap_settings(db: AsyncSession, **overrides) -> None:
  26. """Write a minimal but valid LDAP config to the settings table."""
  27. defaults = {
  28. "ldap_enabled": "true",
  29. "ldap_server_url": "ldaps://ldap.test.example:636", # pragma: allowlist secret — test fixture
  30. "ldap_bind_dn": "cn=admin,dc=test,dc=com", # pragma: allowlist secret — test fixture
  31. "ldap_bind_password": "x", # pragma: allowlist secret — test fixture
  32. "ldap_search_base": "dc=test,dc=com",
  33. "ldap_user_filter": "(uid={username})",
  34. "ldap_security": "ldaps",
  35. "ldap_group_mapping": "{}",
  36. "ldap_auto_provision": "false",
  37. "ldap_ca_cert_path": "",
  38. "ldap_default_group": "",
  39. }
  40. defaults.update(overrides)
  41. for key, value in defaults.items():
  42. db.add(Settings(key=key, value=value))
  43. await db.commit()
  44. @pytest.fixture
  45. async def admin_token(async_client: AsyncClient) -> str:
  46. """Enable auth, create an admin, return a valid bearer token."""
  47. # pragma: allowlist secret — test fixture only, not a real credential
  48. test_password = "AdminPass1!" # noqa: S105
  49. await async_client.post(
  50. "/api/v1/auth/setup",
  51. json={
  52. "auth_enabled": True,
  53. "admin_username": "ldapadmin",
  54. "admin_password": test_password,
  55. },
  56. )
  57. login = await async_client.post(
  58. "/api/v1/auth/login",
  59. json={"username": "ldapadmin", "password": test_password},
  60. )
  61. return login.json()["access_token"]
  62. # ---------------------------------------------------------------------------
  63. # /auth/ldap/search
  64. # ---------------------------------------------------------------------------
  65. class TestLdapSearchRoute:
  66. @pytest.mark.asyncio
  67. @pytest.mark.integration
  68. async def test_requires_auth(self, async_client: AsyncClient, db_session: AsyncSession):
  69. """Anonymous access is rejected when auth is enabled."""
  70. await async_client.post(
  71. "/api/v1/auth/setup",
  72. json={
  73. "auth_enabled": True,
  74. "admin_username": "x",
  75. "admin_password": "AdminPass1!",
  76. }, # pragma: allowlist secret — test fixture
  77. )
  78. response = await async_client.get("/api/v1/auth/ldap/search?q=jdoe")
  79. assert response.status_code == 401
  80. @pytest.mark.asyncio
  81. @pytest.mark.integration
  82. async def test_rejects_short_query(self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession):
  83. """Single-char queries would be effectively unbounded against a large directory."""
  84. await _seed_ldap_settings(db_session)
  85. response = await async_client.get(
  86. "/api/v1/auth/ldap/search?q=j",
  87. headers={"Authorization": f"Bearer {admin_token}"},
  88. )
  89. assert response.status_code == 400
  90. assert "at least 2 characters" in response.json()["detail"]
  91. @pytest.mark.asyncio
  92. @pytest.mark.integration
  93. async def test_rejects_when_ldap_disabled(
  94. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  95. ):
  96. """No LDAP config in settings → 400 with a clear message."""
  97. response = await async_client.get(
  98. "/api/v1/auth/ldap/search?q=jdoe",
  99. headers={"Authorization": f"Bearer {admin_token}"},
  100. )
  101. assert response.status_code == 400
  102. assert "LDAP is not enabled" in response.json()["detail"]
  103. @pytest.mark.asyncio
  104. @pytest.mark.integration
  105. async def test_surfaces_unreachable_as_503(
  106. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  107. ):
  108. """When the underlying search fails (network/auth), the admin gets 503 — not
  109. a silent empty list (which would look like 'no matches')."""
  110. await _seed_ldap_settings(db_session)
  111. with patch(
  112. "backend.app.services.ldap_service.search_ldap_users",
  113. side_effect=RuntimeError("simulated outage"),
  114. ):
  115. response = await async_client.get(
  116. "/api/v1/auth/ldap/search?q=jdoe",
  117. headers={"Authorization": f"Bearer {admin_token}"},
  118. )
  119. assert response.status_code == 503
  120. # Detail now includes the underlying exception class + message so the
  121. # admin can see why (e.g. "LDAP search failed: RuntimeError: simulated outage").
  122. detail = response.json()["detail"].lower()
  123. assert "ldap search failed" in detail
  124. assert "simulated outage" in detail
  125. @pytest.mark.asyncio
  126. @pytest.mark.integration
  127. async def test_returns_results_annotated_with_already_provisioned(
  128. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  129. ):
  130. """Results that match an existing local row must come back with the flag set."""
  131. await _seed_ldap_settings(db_session)
  132. # Seed an existing local user that shares a username with one LDAP result.
  133. db_session.add(User(username="existing", email="x@test.com", password_hash="$x$", role="user"))
  134. await db_session.commit()
  135. fake_results = [
  136. LDAPSearchResult(
  137. username="jdoe",
  138. email="jdoe@test.com",
  139. display_name="John Doe",
  140. dn="cn=John Doe,dc=test,dc=com",
  141. ),
  142. LDAPSearchResult(
  143. username="existing",
  144. email="existing@test.com",
  145. display_name="Already Provisioned",
  146. dn="cn=existing,dc=test,dc=com",
  147. ),
  148. ]
  149. with patch(
  150. "backend.app.services.ldap_service.search_ldap_users",
  151. return_value=fake_results,
  152. ):
  153. response = await async_client.get(
  154. "/api/v1/auth/ldap/search?q=jdoe",
  155. headers={"Authorization": f"Bearer {admin_token}"},
  156. )
  157. assert response.status_code == 200
  158. body = response.json()
  159. assert len(body) == 2
  160. by_user = {r["username"]: r for r in body}
  161. assert by_user["jdoe"]["already_provisioned"] is False
  162. assert by_user["existing"]["already_provisioned"] is True
  163. # ---------------------------------------------------------------------------
  164. # /auth/ldap/provision
  165. # ---------------------------------------------------------------------------
  166. class TestLdapProvisionRoute:
  167. @pytest.mark.asyncio
  168. @pytest.mark.integration
  169. async def test_requires_auth(self, async_client: AsyncClient):
  170. await async_client.post(
  171. "/api/v1/auth/setup",
  172. json={
  173. "auth_enabled": True,
  174. "admin_username": "x",
  175. "admin_password": "AdminPass1!",
  176. }, # pragma: allowlist secret — test fixture
  177. )
  178. response = await async_client.post(
  179. "/api/v1/auth/ldap/provision",
  180. json={"username": "jdoe"},
  181. )
  182. assert response.status_code == 401
  183. @pytest.mark.asyncio
  184. @pytest.mark.integration
  185. async def test_404_when_directory_lookup_misses(
  186. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  187. ):
  188. await _seed_ldap_settings(db_session)
  189. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=None):
  190. response = await async_client.post(
  191. "/api/v1/auth/ldap/provision",
  192. json={"username": "nobody"},
  193. headers={"Authorization": f"Bearer {admin_token}"},
  194. )
  195. assert response.status_code == 404
  196. assert "not found in LDAP directory" in response.json()["detail"]
  197. @pytest.mark.asyncio
  198. @pytest.mark.integration
  199. async def test_409_when_local_user_exists(
  200. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  201. ):
  202. """A local user with the same username must block provision — the admin has
  203. to resolve the collision manually rather than silently coexisting."""
  204. await _seed_ldap_settings(db_session)
  205. db_session.add(User(username="jdoe", password_hash="$x$", role="user", auth_source="local"))
  206. await db_session.commit()
  207. fake_ldap = LDAPUserInfo(username="jdoe", email="jdoe@test.com", display_name=None, groups=[])
  208. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=fake_ldap):
  209. response = await async_client.post(
  210. "/api/v1/auth/ldap/provision",
  211. json={"username": "jdoe"},
  212. headers={"Authorization": f"Bearer {admin_token}"},
  213. )
  214. assert response.status_code == 409
  215. assert "local user" in response.json()["detail"].lower()
  216. @pytest.mark.asyncio
  217. @pytest.mark.integration
  218. async def test_409_when_already_provisioned(
  219. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  220. ):
  221. """Re-provisioning an existing LDAP user must give a distinct error so the
  222. UI can suggest 'they exist already, just have them log in' rather than
  223. the more alarming 'local conflict' message."""
  224. await _seed_ldap_settings(db_session)
  225. db_session.add(User(username="alice", password_hash=None, role="user", auth_source="ldap"))
  226. await db_session.commit()
  227. fake_ldap = LDAPUserInfo(username="alice", email="alice@test.com", display_name=None, groups=[])
  228. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=fake_ldap):
  229. response = await async_client.post(
  230. "/api/v1/auth/ldap/provision",
  231. json={"username": "alice"},
  232. headers={"Authorization": f"Bearer {admin_token}"},
  233. )
  234. assert response.status_code == 409
  235. assert "already provisioned" in response.json()["detail"].lower()
  236. @pytest.mark.asyncio
  237. @pytest.mark.integration
  238. async def test_503_when_directory_unreachable(
  239. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  240. ):
  241. await _seed_ldap_settings(db_session)
  242. with patch(
  243. "backend.app.services.ldap_service.lookup_ldap_user",
  244. side_effect=RuntimeError("simulated outage"),
  245. ):
  246. response = await async_client.post(
  247. "/api/v1/auth/ldap/provision",
  248. json={"username": "jdoe"},
  249. headers={"Authorization": f"Bearer {admin_token}"},
  250. )
  251. assert response.status_code == 503
  252. @pytest.mark.asyncio
  253. @pytest.mark.integration
  254. async def test_happy_path_creates_user_with_ldap_auth_source(
  255. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  256. ):
  257. """Verifies the full provision: response shape + DB state."""
  258. await _seed_ldap_settings(db_session)
  259. fake_ldap = LDAPUserInfo(
  260. username="newuser",
  261. email="newuser@test.com",
  262. display_name="New User",
  263. groups=[],
  264. )
  265. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=fake_ldap):
  266. response = await async_client.post(
  267. "/api/v1/auth/ldap/provision",
  268. json={"username": "newuser"},
  269. headers={"Authorization": f"Bearer {admin_token}"},
  270. )
  271. assert response.status_code == 201
  272. body = response.json()
  273. assert body["username"] == "newuser"
  274. assert body["email"] == "newuser@test.com"
  275. assert body["auth_source"] == "ldap"
  276. # Verify DB state: password_hash MUST be None (LDAP has no local credential)
  277. from sqlalchemy import select
  278. row = (await db_session.execute(select(User).where(User.username == "newuser"))).scalar_one()
  279. assert row.auth_source == "ldap"
  280. assert row.password_hash is None
  281. @pytest.mark.asyncio
  282. @pytest.mark.integration
  283. async def test_happy_path_applies_group_mapping(
  284. self, async_client: AsyncClient, admin_token: str, db_session: AsyncSession
  285. ):
  286. """Provision must run the same group-mapping logic as the auto-provision
  287. login path — so an admin who provisions Alice gets the exact same group
  288. memberships as if Alice had logged in herself with auto-provision on."""
  289. await _seed_ldap_settings(
  290. db_session,
  291. ldap_group_mapping='{"cn=staff,ou=groups,dc=test,dc=com": "Operators"}',
  292. )
  293. # Operators group is auto-seeded by the test harness — no need to create it.
  294. fake_ldap = LDAPUserInfo(
  295. username="alice",
  296. email="alice@test.com",
  297. display_name="Alice",
  298. groups=["cn=staff,ou=groups,dc=test,dc=com"],
  299. )
  300. with patch("backend.app.services.ldap_service.lookup_ldap_user", return_value=fake_ldap):
  301. response = await async_client.post(
  302. "/api/v1/auth/ldap/provision",
  303. json={"username": "alice"},
  304. headers={"Authorization": f"Bearer {admin_token}"},
  305. )
  306. assert response.status_code == 201
  307. body = response.json()
  308. group_names = {g["name"] for g in body["groups"]}
  309. assert "Operators" in group_names