test_advanced_auth_api.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. """Integration tests for Advanced Authentication API endpoints.
  2. Tests the full request/response cycle for SMTP configuration, advanced auth toggle,
  3. email-based login, forgot password, admin password reset, and user creation
  4. with advanced authentication enabled.
  5. """
  6. from unittest.mock import patch
  7. import pytest
  8. from httpx import AsyncClient
  9. # Shared SMTP settings data used across test classes
  10. SMTP_DATA = {
  11. "smtp_host": "smtp.test.com",
  12. "smtp_port": 587,
  13. "smtp_username": "test@test.com",
  14. "smtp_password": "testpass",
  15. "smtp_security": "starttls",
  16. "smtp_auth_enabled": True,
  17. "smtp_from_email": "noreply@test.com",
  18. }
  19. async def _setup_admin(async_client: AsyncClient, username: str = "admin", password: str = "AdminPass1!"):
  20. """Enable auth and create admin user, return admin token."""
  21. await async_client.post(
  22. "/api/v1/auth/setup",
  23. json={
  24. "auth_enabled": True,
  25. "admin_username": username,
  26. "admin_password": password,
  27. },
  28. )
  29. login = await async_client.post(
  30. "/api/v1/auth/login",
  31. json={"username": username, "password": password},
  32. )
  33. return login.json()["access_token"]
  34. async def _setup_smtp_and_advanced_auth(async_client: AsyncClient, token: str):
  35. """Configure SMTP and enable advanced auth. Must mock send_email externally."""
  36. headers = {"Authorization": f"Bearer {token}"}
  37. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  38. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  39. async def _create_regular_user(
  40. async_client: AsyncClient, token: str, username: str = "regular", password: str = "Regularpass1!"
  41. ):
  42. """Create a regular (non-admin) user and return their token."""
  43. headers = {"Authorization": f"Bearer {token}"}
  44. await async_client.post(
  45. "/api/v1/users/",
  46. headers=headers,
  47. json={"username": username, "password": password, "role": "user"},
  48. )
  49. login = await async_client.post(
  50. "/api/v1/auth/login",
  51. json={"username": username, "password": password},
  52. )
  53. return login.json()["access_token"]
  54. class TestSMTPConfigAPI:
  55. """Integration tests for SMTP configuration endpoints."""
  56. @pytest.fixture
  57. async def admin_token(self, async_client: AsyncClient):
  58. return await _setup_admin(async_client, "smtpadmin", "AdminPass1!")
  59. @pytest.mark.asyncio
  60. @pytest.mark.integration
  61. async def test_save_smtp_settings(self, async_client: AsyncClient, admin_token: str):
  62. """POST /auth/smtp with valid settings returns 200."""
  63. response = await async_client.post(
  64. "/api/v1/auth/smtp",
  65. headers={"Authorization": f"Bearer {admin_token}"},
  66. json=SMTP_DATA,
  67. )
  68. assert response.status_code == 200
  69. assert "saved" in response.json()["message"].lower() or "success" in response.json()["message"].lower()
  70. @pytest.mark.asyncio
  71. @pytest.mark.integration
  72. async def test_get_smtp_settings_masks_password(self, async_client: AsyncClient, admin_token: str):
  73. """GET /auth/smtp returns settings with password masked (None)."""
  74. headers = {"Authorization": f"Bearer {admin_token}"}
  75. # Save settings first
  76. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  77. response = await async_client.get("/api/v1/auth/smtp", headers=headers)
  78. assert response.status_code == 200
  79. result = response.json()
  80. assert result["smtp_host"] == "smtp.test.com"
  81. assert result["smtp_password"] is None
  82. @pytest.mark.asyncio
  83. @pytest.mark.integration
  84. async def test_smtp_settings_requires_admin(self, async_client: AsyncClient, admin_token: str):
  85. """Non-admin user gets 403 on SMTP endpoints."""
  86. user_token = await _create_regular_user(async_client, admin_token, "smtpregular", "Pass12345!")
  87. headers = {"Authorization": f"Bearer {user_token}"}
  88. response = await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  89. assert response.status_code == 403
  90. response = await async_client.get("/api/v1/auth/smtp", headers=headers)
  91. assert response.status_code == 403
  92. @pytest.mark.asyncio
  93. @pytest.mark.integration
  94. async def test_save_smtp_settings_no_auth(self, async_client: AsyncClient, admin_token: str):
  95. """No token on SMTP save returns 401."""
  96. response = await async_client.post("/api/v1/auth/smtp", json=SMTP_DATA)
  97. assert response.status_code == 401
  98. @pytest.mark.asyncio
  99. @pytest.mark.integration
  100. async def test_test_smtp_connection(self, async_client: AsyncClient, admin_token: str):
  101. """POST /auth/smtp/test with mocked send_email returns success."""
  102. await async_client.post(
  103. "/api/v1/auth/smtp",
  104. headers={"Authorization": f"Bearer {admin_token}"},
  105. json=SMTP_DATA,
  106. )
  107. with patch("backend.app.api.routes.auth.send_email"):
  108. response = await async_client.post(
  109. "/api/v1/auth/smtp/test",
  110. headers={"Authorization": f"Bearer {admin_token}"},
  111. json={
  112. "test_recipient": "recipient@test.com",
  113. },
  114. )
  115. assert response.status_code == 200
  116. assert response.json()["success"] is True
  117. class TestAdvancedAuthToggleAPI:
  118. """Integration tests for enabling/disabling advanced authentication."""
  119. @pytest.fixture
  120. async def admin_token(self, async_client: AsyncClient):
  121. return await _setup_admin(async_client, "toggleadmin", "AdminPass1!")
  122. @pytest.mark.asyncio
  123. @pytest.mark.integration
  124. async def test_enable_advanced_auth(self, async_client: AsyncClient, admin_token: str):
  125. """Enable advanced auth after SMTP is configured returns 200."""
  126. headers = {"Authorization": f"Bearer {admin_token}"}
  127. # Configure SMTP first
  128. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  129. response = await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  130. assert response.status_code == 200
  131. assert response.json()["advanced_auth_enabled"] is True
  132. @pytest.mark.asyncio
  133. @pytest.mark.integration
  134. async def test_enable_advanced_auth_without_smtp(self, async_client: AsyncClient, admin_token: str):
  135. """Enable advanced auth without SMTP configured returns 400."""
  136. headers = {"Authorization": f"Bearer {admin_token}"}
  137. response = await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  138. assert response.status_code == 400
  139. assert "SMTP" in response.json()["detail"]
  140. @pytest.mark.asyncio
  141. @pytest.mark.integration
  142. async def test_disable_advanced_auth(self, async_client: AsyncClient, admin_token: str):
  143. """Disable advanced auth returns 200."""
  144. headers = {"Authorization": f"Bearer {admin_token}"}
  145. # Enable first
  146. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  147. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  148. response = await async_client.post("/api/v1/auth/advanced-auth/disable", headers=headers)
  149. assert response.status_code == 200
  150. assert response.json()["advanced_auth_enabled"] is False
  151. @pytest.mark.asyncio
  152. @pytest.mark.integration
  153. async def test_advanced_auth_status_public(self, async_client: AsyncClient, admin_token: str):
  154. """GET /auth/advanced-auth/status is accessible without token."""
  155. response = await async_client.get("/api/v1/auth/advanced-auth/status")
  156. assert response.status_code == 200
  157. result = response.json()
  158. assert "advanced_auth_enabled" in result
  159. assert "smtp_configured" in result
  160. @pytest.mark.asyncio
  161. @pytest.mark.integration
  162. async def test_enable_requires_admin(self, async_client: AsyncClient, admin_token: str):
  163. """Non-admin user gets 403 on enable/disable."""
  164. user_token = await _create_regular_user(async_client, admin_token, "toggleregular", "Pass12345!")
  165. headers = {"Authorization": f"Bearer {user_token}"}
  166. response = await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  167. assert response.status_code == 403
  168. response = await async_client.post("/api/v1/auth/advanced-auth/disable", headers=headers)
  169. assert response.status_code == 403
  170. class TestEmailLoginAPI:
  171. """Integration tests for email-based login."""
  172. @pytest.fixture
  173. async def admin_token(self, async_client: AsyncClient):
  174. return await _setup_admin(async_client, "emailadmin", "AdminPass1!")
  175. @pytest.mark.asyncio
  176. @pytest.mark.integration
  177. async def test_login_with_email(self, async_client: AsyncClient, admin_token: str):
  178. """Login with email address when advanced auth is enabled returns token."""
  179. headers = {"Authorization": f"Bearer {admin_token}"}
  180. with patch("backend.app.api.routes.users.send_email"):
  181. # Configure SMTP + advanced auth
  182. await _setup_smtp_and_advanced_auth(async_client, admin_token)
  183. # Create user with email (password auto-generated, so we set one explicitly via update)
  184. create_resp = await async_client.post(
  185. "/api/v1/users/",
  186. headers=headers,
  187. json={"username": "emailuser", "email": "emailuser@test.com", "role": "user"},
  188. )
  189. assert create_resp.status_code == 201
  190. user_id = create_resp.json()["id"]
  191. # Set a known password via admin update
  192. await async_client.patch(
  193. f"/api/v1/users/{user_id}",
  194. headers=headers,
  195. json={"password": "Knownpassword1!"},
  196. )
  197. # Login with email
  198. response = await async_client.post(
  199. "/api/v1/auth/login",
  200. json={"username": "emailuser@test.com", "password": "Knownpassword1!"},
  201. )
  202. assert response.status_code == 200
  203. assert "access_token" in response.json()
  204. @pytest.mark.asyncio
  205. @pytest.mark.integration
  206. async def test_login_with_email_case_insensitive(self, async_client: AsyncClient, admin_token: str):
  207. """Login with uppercase email matches case-insensitively."""
  208. headers = {"Authorization": f"Bearer {admin_token}"}
  209. with patch("backend.app.api.routes.users.send_email"):
  210. await _setup_smtp_and_advanced_auth(async_client, admin_token)
  211. create_resp = await async_client.post(
  212. "/api/v1/users/",
  213. headers=headers,
  214. json={"username": "caseuser", "email": "caseuser@test.com", "role": "user"},
  215. )
  216. user_id = create_resp.json()["id"]
  217. await async_client.patch(
  218. f"/api/v1/users/{user_id}",
  219. headers=headers,
  220. json={"password": "Casepassword1!"},
  221. )
  222. response = await async_client.post(
  223. "/api/v1/auth/login",
  224. json={"username": "CASEUSER@TEST.COM", "password": "Casepassword1!"},
  225. )
  226. assert response.status_code == 200
  227. assert "access_token" in response.json()
  228. @pytest.mark.asyncio
  229. @pytest.mark.integration
  230. async def test_login_with_email_advanced_auth_disabled(self, async_client: AsyncClient, admin_token: str):
  231. """Email login fails when advanced auth is disabled."""
  232. headers = {"Authorization": f"Bearer {admin_token}"}
  233. # Create user with email but no advanced auth
  234. await async_client.post(
  235. "/api/v1/users/",
  236. headers=headers,
  237. json={"username": "noemail", "password": "NoEmailPass1!", "email": "noemail@test.com", "role": "user"},
  238. )
  239. # Try to login with email — should fail since advanced auth is off
  240. response = await async_client.post(
  241. "/api/v1/auth/login",
  242. json={"username": "noemail@test.com", "password": "NoEmailPass1!"},
  243. )
  244. assert response.status_code == 401
  245. @pytest.mark.asyncio
  246. @pytest.mark.integration
  247. async def test_login_with_username_still_works(self, async_client: AsyncClient, admin_token: str):
  248. """Username-based login still works when advanced auth is enabled."""
  249. headers = {"Authorization": f"Bearer {admin_token}"}
  250. with patch("backend.app.api.routes.users.send_email"):
  251. await _setup_smtp_and_advanced_auth(async_client, admin_token)
  252. create_resp = await async_client.post(
  253. "/api/v1/users/",
  254. headers=headers,
  255. json={"username": "usernameuser", "email": "usernameuser@test.com", "role": "user"},
  256. )
  257. user_id = create_resp.json()["id"]
  258. await async_client.patch(
  259. f"/api/v1/users/{user_id}",
  260. headers=headers,
  261. json={"password": "Usernamepass1!"},
  262. )
  263. # Login with username (not email)
  264. response = await async_client.post(
  265. "/api/v1/auth/login",
  266. json={"username": "usernameuser", "password": "Usernamepass1!"},
  267. )
  268. assert response.status_code == 200
  269. assert "access_token" in response.json()
  270. class TestForgotPasswordAPI:
  271. """Integration tests for forgot-password flow."""
  272. @pytest.fixture
  273. async def admin_token(self, async_client: AsyncClient):
  274. return await _setup_admin(async_client, "forgotadmin", "AdminPass1!")
  275. @pytest.mark.asyncio
  276. @pytest.mark.integration
  277. async def test_forgot_password_sends_email(self, async_client: AsyncClient, admin_token: str):
  278. """POST /auth/forgot-password with valid email sends reset email."""
  279. headers = {"Authorization": f"Bearer {admin_token}"}
  280. with patch("backend.app.api.routes.users.send_email"):
  281. await _setup_smtp_and_advanced_auth(async_client, admin_token)
  282. # Create a user with email
  283. create_resp = await async_client.post(
  284. "/api/v1/users/",
  285. headers=headers,
  286. json={"username": "forgotuser", "email": "forgot@test.com", "role": "user"},
  287. )
  288. assert create_resp.status_code == 201
  289. with patch("backend.app.api.routes.auth.send_email") as mock_send:
  290. response = await async_client.post(
  291. "/api/v1/auth/forgot-password",
  292. json={"email": "forgot@test.com"},
  293. )
  294. assert response.status_code == 200
  295. mock_send.assert_called_once()
  296. # Verify the email was sent to the right address
  297. assert mock_send.call_args[0][1] == "forgot@test.com"
  298. @pytest.mark.asyncio
  299. @pytest.mark.integration
  300. async def test_forgot_password_unknown_email(self, async_client: AsyncClient, admin_token: str):
  301. """Unknown email still returns 200 (anti-enumeration) but send_email not called."""
  302. headers = {"Authorization": f"Bearer {admin_token}"}
  303. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  304. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  305. with patch("backend.app.api.routes.auth.send_email") as mock_send:
  306. response = await async_client.post(
  307. "/api/v1/auth/forgot-password",
  308. json={"email": "unknown@test.com"},
  309. )
  310. assert response.status_code == 200
  311. mock_send.assert_not_called()
  312. @pytest.mark.asyncio
  313. @pytest.mark.integration
  314. async def test_forgot_password_requires_advanced_auth(self, async_client: AsyncClient, admin_token: str):
  315. """Forgot password returns 400 when advanced auth is disabled."""
  316. response = await async_client.post(
  317. "/api/v1/auth/forgot-password",
  318. json={"email": "test@test.com"},
  319. )
  320. assert response.status_code == 400
  321. assert "not enabled" in response.json()["detail"].lower()
  322. @pytest.mark.asyncio
  323. @pytest.mark.integration
  324. async def test_forgot_password_changes_password(self, async_client: AsyncClient, admin_token: str):
  325. """After forgot-password + confirm, old password stops working and new one works.
  326. H-6: The flow is now token-based: /forgot-password issues a reset link and
  327. /forgot-password/confirm consumes the token and sets the new password.
  328. """
  329. from unittest.mock import AsyncMock
  330. headers = {"Authorization": f"Bearer {admin_token}"}
  331. with patch("backend.app.api.routes.users.send_email"):
  332. await _setup_smtp_and_advanced_auth(async_client, admin_token)
  333. create_resp = await async_client.post(
  334. "/api/v1/users/",
  335. headers=headers,
  336. json={"username": "resetme", "email": "resetme@test.com", "role": "user"},
  337. )
  338. user_id = create_resp.json()["id"]
  339. await async_client.patch(
  340. f"/api/v1/users/{user_id}",
  341. headers=headers,
  342. json={"password": "Originalpass1!"},
  343. )
  344. # Verify login works with original password
  345. login_resp = await async_client.post(
  346. "/api/v1/auth/login",
  347. json={"username": "resetme", "password": "Originalpass1!"},
  348. )
  349. assert login_resp.status_code == 200
  350. # Trigger forgot-password and capture the reset URL (contains the token)
  351. captured: dict[str, str] = {}
  352. async def _capture_link_email(db, username, reset_url):
  353. captured["reset_url"] = reset_url
  354. return ("subject", "body", "<body/>")
  355. with (
  356. patch(
  357. "backend.app.api.routes.auth.create_password_reset_link_email_from_template",
  358. side_effect=_capture_link_email,
  359. ),
  360. patch("backend.app.api.routes.auth.send_email"),
  361. ):
  362. resp = await async_client.post(
  363. "/api/v1/auth/forgot-password",
  364. json={"email": "resetme@test.com"},
  365. )
  366. assert resp.status_code == 200
  367. assert "reset_url" in captured, "Reset URL not captured — email function was not called"
  368. # Extract the token from the captured URL and confirm the reset
  369. reset_token = captured["reset_url"].split("reset_token=")[1]
  370. confirm_resp = await async_client.post(
  371. "/api/v1/auth/forgot-password/confirm",
  372. json={"token": reset_token, "new_password": "Newpass456!"},
  373. )
  374. assert confirm_resp.status_code == 200
  375. # Old password should no longer work
  376. login_resp = await async_client.post(
  377. "/api/v1/auth/login",
  378. json={"username": "resetme", "password": "Originalpass1!"},
  379. )
  380. assert login_resp.status_code == 401
  381. # New password must work
  382. login_resp = await async_client.post(
  383. "/api/v1/auth/login",
  384. json={"username": "resetme", "password": "Newpass456!"},
  385. )
  386. assert login_resp.status_code == 200
  387. class TestAdminResetPasswordAPI:
  388. """Integration tests for admin password reset endpoint."""
  389. @pytest.fixture
  390. async def admin_token(self, async_client: AsyncClient):
  391. return await _setup_admin(async_client, "resetadmin", "AdminPass1!")
  392. @pytest.mark.asyncio
  393. @pytest.mark.integration
  394. async def test_reset_password_sends_email(self, async_client: AsyncClient, admin_token: str):
  395. """POST /auth/reset-password sends email to user."""
  396. headers = {"Authorization": f"Bearer {admin_token}"}
  397. with patch("backend.app.api.routes.users.send_email"):
  398. await _setup_smtp_and_advanced_auth(async_client, admin_token)
  399. create_resp = await async_client.post(
  400. "/api/v1/users/",
  401. headers=headers,
  402. json={"username": "resetuser", "email": "resetuser@test.com", "role": "user"},
  403. )
  404. user_id = create_resp.json()["id"]
  405. with patch("backend.app.api.routes.auth.send_email") as mock_send:
  406. response = await async_client.post(
  407. "/api/v1/auth/reset-password",
  408. headers=headers,
  409. json={"user_id": user_id},
  410. )
  411. assert response.status_code == 200
  412. mock_send.assert_called_once()
  413. assert mock_send.call_args[0][1] == "resetuser@test.com"
  414. @pytest.mark.asyncio
  415. @pytest.mark.integration
  416. async def test_reset_password_requires_admin(self, async_client: AsyncClient, admin_token: str):
  417. """Non-admin user gets 403 on reset-password."""
  418. # Create regular user before enabling advanced auth (no email required)
  419. user_token = await _create_regular_user(async_client, admin_token, "resetregular", "Pass12345!")
  420. with patch("backend.app.api.routes.users.send_email"):
  421. await _setup_smtp_and_advanced_auth(async_client, admin_token)
  422. response = await async_client.post(
  423. "/api/v1/auth/reset-password",
  424. headers={"Authorization": f"Bearer {user_token}"},
  425. json={"user_id": 1},
  426. )
  427. assert response.status_code == 403
  428. @pytest.mark.asyncio
  429. @pytest.mark.integration
  430. async def test_reset_password_requires_advanced_auth(self, async_client: AsyncClient, admin_token: str):
  431. """Reset password returns 400 when advanced auth is disabled."""
  432. headers = {"Authorization": f"Bearer {admin_token}"}
  433. response = await async_client.post(
  434. "/api/v1/auth/reset-password",
  435. headers=headers,
  436. json={"user_id": 999},
  437. )
  438. assert response.status_code == 400
  439. assert "not enabled" in response.json()["detail"].lower()
  440. @pytest.mark.asyncio
  441. @pytest.mark.integration
  442. async def test_reset_password_user_not_found(self, async_client: AsyncClient, admin_token: str):
  443. """Reset password with invalid user_id returns 404."""
  444. headers = {"Authorization": f"Bearer {admin_token}"}
  445. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  446. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  447. response = await async_client.post(
  448. "/api/v1/auth/reset-password",
  449. headers=headers,
  450. json={"user_id": 99999},
  451. )
  452. assert response.status_code == 404
  453. @pytest.mark.asyncio
  454. @pytest.mark.integration
  455. async def test_reset_password_user_no_email(self, async_client: AsyncClient, admin_token: str):
  456. """Reset password for user without email returns 400."""
  457. headers = {"Authorization": f"Bearer {admin_token}"}
  458. # Save SMTP and enable advanced auth
  459. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  460. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  461. # Disable advanced auth temporarily to create a user without email
  462. await async_client.post("/api/v1/auth/advanced-auth/disable", headers=headers)
  463. create_resp = await async_client.post(
  464. "/api/v1/users/",
  465. headers=headers,
  466. json={"username": "noemailuser", "password": "Noemail12345!", "role": "user"},
  467. )
  468. user_id = create_resp.json()["id"]
  469. # Re-enable advanced auth
  470. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  471. response = await async_client.post(
  472. "/api/v1/auth/reset-password",
  473. headers=headers,
  474. json={"user_id": user_id},
  475. )
  476. assert response.status_code == 400
  477. assert "email" in response.json()["detail"].lower()
  478. class TestUserCreationAdvancedAuth:
  479. """Integration tests for user creation with advanced auth enabled."""
  480. @pytest.fixture
  481. async def admin_token(self, async_client: AsyncClient):
  482. return await _setup_admin(async_client, "createadmin", "AdminPass1!")
  483. @pytest.mark.asyncio
  484. @pytest.mark.integration
  485. async def test_create_user_advanced_auth_requires_email(self, async_client: AsyncClient, admin_token: str):
  486. """Creating user without email when advanced auth is on returns 400."""
  487. headers = {"Authorization": f"Bearer {admin_token}"}
  488. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  489. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  490. response = await async_client.post(
  491. "/api/v1/users/",
  492. headers=headers,
  493. json={"username": "noemailcreate", "role": "user"},
  494. )
  495. assert response.status_code == 400
  496. assert "email" in response.json()["detail"].lower()
  497. @pytest.mark.asyncio
  498. @pytest.mark.integration
  499. async def test_create_user_advanced_auth_auto_password(self, async_client: AsyncClient, admin_token: str):
  500. """Creating user with email auto-generates password and sends welcome email."""
  501. headers = {"Authorization": f"Bearer {admin_token}"}
  502. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  503. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  504. with patch("backend.app.api.routes.users.send_email") as mock_send:
  505. response = await async_client.post(
  506. "/api/v1/users/",
  507. headers=headers,
  508. json={"username": "autopassuser", "email": "autopass@test.com", "role": "user"},
  509. )
  510. assert response.status_code == 201
  511. result = response.json()
  512. assert result["username"] == "autopassuser"
  513. assert result["email"] == "autopass@test.com"
  514. # Welcome email should have been sent
  515. mock_send.assert_called_once()
  516. assert mock_send.call_args[0][1] == "autopass@test.com"
  517. @pytest.mark.asyncio
  518. @pytest.mark.integration
  519. async def test_create_user_duplicate_email(self, async_client: AsyncClient, admin_token: str):
  520. """Creating two users with the same email returns 400."""
  521. headers = {"Authorization": f"Bearer {admin_token}"}
  522. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  523. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  524. with patch("backend.app.api.routes.users.send_email"):
  525. resp1 = await async_client.post(
  526. "/api/v1/users/",
  527. headers=headers,
  528. json={"username": "dupemail1", "email": "dupe@test.com", "role": "user"},
  529. )
  530. assert resp1.status_code == 201
  531. resp2 = await async_client.post(
  532. "/api/v1/users/",
  533. headers=headers,
  534. json={"username": "dupemail2", "email": "dupe@test.com", "role": "user"},
  535. )
  536. assert resp2.status_code == 400
  537. assert "email" in resp2.json()["detail"].lower()
  538. @pytest.mark.asyncio
  539. @pytest.mark.integration
  540. async def test_create_user_response_includes_email(self, async_client: AsyncClient, admin_token: str):
  541. """Created user response includes email field."""
  542. headers = {"Authorization": f"Bearer {admin_token}"}
  543. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  544. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  545. with patch("backend.app.api.routes.users.send_email"):
  546. response = await async_client.post(
  547. "/api/v1/users/",
  548. headers=headers,
  549. json={"username": "emailresp", "email": "emailresp@test.com", "role": "user"},
  550. )
  551. assert response.status_code == 201
  552. result = response.json()
  553. assert "email" in result
  554. assert result["email"] == "emailresp@test.com"
  555. # ===========================================================================
  556. # M-1: OIDC/LDAP users must not be able to use the password reset flow
  557. # ===========================================================================
  558. class TestAuthSourcePasswordResetBlocking:
  559. """Forgot-password must silently skip OIDC and LDAP users (M-1)."""
  560. @pytest.fixture
  561. async def admin_token(self, async_client: AsyncClient):
  562. return await _setup_admin(async_client, "authsrcadmin", "AdminPass1!")
  563. @pytest.mark.asyncio
  564. @pytest.mark.integration
  565. async def test_forgot_password_silently_skips_oidc_user(
  566. self, async_client: AsyncClient, admin_token: str, db_session
  567. ):
  568. """forgot-password for an OIDC user returns 200 but does NOT send email."""
  569. from backend.app.core.auth import get_password_hash
  570. from backend.app.models.user import User
  571. headers = {"Authorization": f"Bearer {admin_token}"}
  572. await async_client.post("/api/v1/auth/smtp", headers=headers, json=SMTP_DATA)
  573. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  574. # Directly insert an OIDC-sourced user into the DB
  575. oidc_user = User(
  576. username="oidcpwreset",
  577. email="oidcpwreset@test.com",
  578. auth_source="oidc",
  579. password_hash=get_password_hash("irrelevant"),
  580. role="user",
  581. is_active=True,
  582. )
  583. db_session.add(oidc_user)
  584. await db_session.commit()
  585. with patch("backend.app.api.routes.auth.send_email") as mock_send:
  586. response = await async_client.post(
  587. "/api/v1/auth/forgot-password",
  588. json={"email": "oidcpwreset@test.com"},
  589. )
  590. # Anti-enumeration: still returns 200
  591. assert response.status_code == 200
  592. # But no email is sent for OIDC users
  593. mock_send.assert_not_called()