test_auth_api.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. """Integration tests for Authentication API endpoints.
  2. Tests the full request/response cycle for /api/v1/auth/ and /api/v1/users/ endpoints.
  3. """
  4. import pytest
  5. from httpx import AsyncClient
  6. class TestAuthStatusAPI:
  7. """Integration tests for /api/v1/auth/status endpoint."""
  8. @pytest.mark.asyncio
  9. @pytest.mark.integration
  10. async def test_get_auth_status_disabled(self, async_client: AsyncClient):
  11. """Verify auth status returns disabled when not configured."""
  12. response = await async_client.get("/api/v1/auth/status")
  13. assert response.status_code == 200
  14. result = response.json()
  15. assert "auth_enabled" in result
  16. assert result["auth_enabled"] is False
  17. assert result["requires_setup"] is True
  18. class TestAuthSetupAPI:
  19. """Integration tests for /api/v1/auth/setup endpoint."""
  20. @pytest.mark.asyncio
  21. @pytest.mark.integration
  22. async def test_setup_auth_disabled(self, async_client: AsyncClient):
  23. """Verify auth can be set up with auth disabled (no password required)."""
  24. response = await async_client.post(
  25. "/api/v1/auth/setup",
  26. json={"auth_enabled": False},
  27. )
  28. assert response.status_code == 200
  29. result = response.json()
  30. assert result["auth_enabled"] is False
  31. assert result["admin_created"] is False
  32. @pytest.mark.asyncio
  33. @pytest.mark.integration
  34. async def test_setup_auth_enabled_requires_credentials(self, async_client: AsyncClient):
  35. """Verify enabling auth requires admin username and password."""
  36. response = await async_client.post(
  37. "/api/v1/auth/setup",
  38. json={"auth_enabled": True},
  39. )
  40. assert response.status_code == 400
  41. assert "Admin username and password are required" in response.json()["detail"]
  42. @pytest.mark.asyncio
  43. @pytest.mark.integration
  44. async def test_setup_auth_enabled_with_credentials(self, async_client: AsyncClient):
  45. """Verify auth can be enabled with admin credentials."""
  46. response = await async_client.post(
  47. "/api/v1/auth/setup",
  48. json={
  49. "auth_enabled": True,
  50. "admin_username": "testadmin",
  51. "admin_password": "testpassword123",
  52. },
  53. )
  54. assert response.status_code == 200
  55. result = response.json()
  56. assert result["auth_enabled"] is True
  57. assert result["admin_created"] is True
  58. class TestAuthLoginAPI:
  59. """Integration tests for /api/v1/auth/login endpoint."""
  60. @pytest.mark.asyncio
  61. @pytest.mark.integration
  62. async def test_login_auth_disabled(self, async_client: AsyncClient):
  63. """Verify login fails when auth is not enabled."""
  64. response = await async_client.post(
  65. "/api/v1/auth/login",
  66. json={"username": "admin", "password": "password"},
  67. )
  68. assert response.status_code == 400
  69. assert "Authentication is not enabled" in response.json()["detail"]
  70. @pytest.mark.asyncio
  71. @pytest.mark.integration
  72. async def test_login_success(self, async_client: AsyncClient):
  73. """Verify login succeeds with valid credentials after setup."""
  74. # First enable auth
  75. await async_client.post(
  76. "/api/v1/auth/setup",
  77. json={
  78. "auth_enabled": True,
  79. "admin_username": "logintest",
  80. "admin_password": "loginpassword123",
  81. },
  82. )
  83. # Now login
  84. response = await async_client.post(
  85. "/api/v1/auth/login",
  86. json={"username": "logintest", "password": "loginpassword123"},
  87. )
  88. assert response.status_code == 200
  89. result = response.json()
  90. assert "access_token" in result
  91. assert result["token_type"] == "bearer"
  92. assert result["user"]["username"] == "logintest"
  93. assert result["user"]["role"] == "admin"
  94. @pytest.mark.asyncio
  95. @pytest.mark.integration
  96. async def test_login_invalid_credentials(self, async_client: AsyncClient):
  97. """Verify login fails with invalid credentials."""
  98. # First enable auth
  99. await async_client.post(
  100. "/api/v1/auth/setup",
  101. json={
  102. "auth_enabled": True,
  103. "admin_username": "invalidtest",
  104. "admin_password": "correctpassword",
  105. },
  106. )
  107. # Try login with wrong password
  108. response = await async_client.post(
  109. "/api/v1/auth/login",
  110. json={"username": "invalidtest", "password": "wrongpassword"},
  111. )
  112. assert response.status_code == 401
  113. assert "Incorrect username or password" in response.json()["detail"]
  114. class TestAuthMeAPI:
  115. """Integration tests for /api/v1/auth/me endpoint."""
  116. @pytest.mark.asyncio
  117. @pytest.mark.integration
  118. async def test_me_without_token(self, async_client: AsyncClient):
  119. """Verify /me fails without authentication token."""
  120. response = await async_client.get("/api/v1/auth/me")
  121. assert response.status_code == 401
  122. @pytest.mark.asyncio
  123. @pytest.mark.integration
  124. async def test_me_with_valid_token(self, async_client: AsyncClient):
  125. """Verify /me returns user info with valid token."""
  126. # Setup and login
  127. await async_client.post(
  128. "/api/v1/auth/setup",
  129. json={
  130. "auth_enabled": True,
  131. "admin_username": "metest",
  132. "admin_password": "mepassword123",
  133. },
  134. )
  135. login_response = await async_client.post(
  136. "/api/v1/auth/login",
  137. json={"username": "metest", "password": "mepassword123"},
  138. )
  139. token = login_response.json()["access_token"]
  140. # Get current user
  141. response = await async_client.get(
  142. "/api/v1/auth/me",
  143. headers={"Authorization": f"Bearer {token}"},
  144. )
  145. assert response.status_code == 200
  146. result = response.json()
  147. assert result["username"] == "metest"
  148. assert result["role"] == "admin"
  149. assert result["is_active"] is True
  150. class TestUsersAPI:
  151. """Integration tests for /api/v1/users/ endpoints."""
  152. @pytest.fixture
  153. async def auth_token(self, async_client: AsyncClient):
  154. """Setup auth and return admin token."""
  155. await async_client.post(
  156. "/api/v1/auth/setup",
  157. json={
  158. "auth_enabled": True,
  159. "admin_username": "usersadmin",
  160. "admin_password": "adminpassword123",
  161. },
  162. )
  163. login_response = await async_client.post(
  164. "/api/v1/auth/login",
  165. json={"username": "usersadmin", "password": "adminpassword123"},
  166. )
  167. return login_response.json()["access_token"]
  168. @pytest.mark.asyncio
  169. @pytest.mark.integration
  170. async def test_list_users_requires_auth(self, async_client: AsyncClient):
  171. """Verify listing users requires authentication."""
  172. response = await async_client.get("/api/v1/users/")
  173. assert response.status_code == 401
  174. @pytest.mark.asyncio
  175. @pytest.mark.integration
  176. async def test_list_users_as_admin(self, async_client: AsyncClient, auth_token: str):
  177. """Verify admin can list users."""
  178. response = await async_client.get(
  179. "/api/v1/users/",
  180. headers={"Authorization": f"Bearer {auth_token}"},
  181. )
  182. assert response.status_code == 200
  183. result = response.json()
  184. assert isinstance(result, list)
  185. assert len(result) >= 1 # At least the admin user
  186. @pytest.mark.asyncio
  187. @pytest.mark.integration
  188. async def test_create_user(self, async_client: AsyncClient, auth_token: str):
  189. """Verify admin can create a new user."""
  190. response = await async_client.post(
  191. "/api/v1/users/",
  192. headers={"Authorization": f"Bearer {auth_token}"},
  193. json={
  194. "username": "newuser",
  195. "password": "newuserpassword",
  196. "role": "user",
  197. },
  198. )
  199. assert response.status_code == 201
  200. result = response.json()
  201. assert result["username"] == "newuser"
  202. assert result["role"] == "user"
  203. assert result["is_active"] is True
  204. @pytest.mark.asyncio
  205. @pytest.mark.integration
  206. async def test_create_user_duplicate_username(self, async_client: AsyncClient, auth_token: str):
  207. """Verify creating user with duplicate username fails."""
  208. # Create first user
  209. await async_client.post(
  210. "/api/v1/users/",
  211. headers={"Authorization": f"Bearer {auth_token}"},
  212. json={
  213. "username": "duplicateuser",
  214. "password": "password123",
  215. "role": "user",
  216. },
  217. )
  218. # Try to create duplicate
  219. response = await async_client.post(
  220. "/api/v1/users/",
  221. headers={"Authorization": f"Bearer {auth_token}"},
  222. json={
  223. "username": "duplicateuser",
  224. "password": "password456",
  225. "role": "user",
  226. },
  227. )
  228. assert response.status_code == 400
  229. assert "Username already exists" in response.json()["detail"]
  230. @pytest.mark.asyncio
  231. @pytest.mark.integration
  232. async def test_update_user(self, async_client: AsyncClient, auth_token: str):
  233. """Verify admin can update a user."""
  234. # Create user
  235. create_response = await async_client.post(
  236. "/api/v1/users/",
  237. headers={"Authorization": f"Bearer {auth_token}"},
  238. json={
  239. "username": "updateuser",
  240. "password": "password123",
  241. "role": "user",
  242. },
  243. )
  244. user_id = create_response.json()["id"]
  245. # Update user
  246. response = await async_client.patch(
  247. f"/api/v1/users/{user_id}",
  248. headers={"Authorization": f"Bearer {auth_token}"},
  249. json={"role": "admin"},
  250. )
  251. assert response.status_code == 200
  252. assert response.json()["role"] == "admin"
  253. @pytest.mark.asyncio
  254. @pytest.mark.integration
  255. async def test_delete_user(self, async_client: AsyncClient, auth_token: str):
  256. """Verify admin can delete a user."""
  257. # Create user
  258. create_response = await async_client.post(
  259. "/api/v1/users/",
  260. headers={"Authorization": f"Bearer {auth_token}"},
  261. json={
  262. "username": "deleteuser",
  263. "password": "password123",
  264. "role": "user",
  265. },
  266. )
  267. user_id = create_response.json()["id"]
  268. # Delete user
  269. response = await async_client.delete(
  270. f"/api/v1/users/{user_id}",
  271. headers={"Authorization": f"Bearer {auth_token}"},
  272. )
  273. assert response.status_code == 204
  274. class TestAuthDisableAPI:
  275. """Integration tests for /api/v1/auth/disable endpoint."""
  276. @pytest.mark.asyncio
  277. @pytest.mark.integration
  278. async def test_disable_auth(self, async_client: AsyncClient):
  279. """Verify admin can disable authentication."""
  280. # Setup auth
  281. await async_client.post(
  282. "/api/v1/auth/setup",
  283. json={
  284. "auth_enabled": True,
  285. "admin_username": "disableadmin",
  286. "admin_password": "adminpassword123",
  287. },
  288. )
  289. # Login to get token
  290. login_response = await async_client.post(
  291. "/api/v1/auth/login",
  292. json={"username": "disableadmin", "password": "adminpassword123"},
  293. )
  294. token = login_response.json()["access_token"]
  295. # Disable auth
  296. response = await async_client.post(
  297. "/api/v1/auth/disable",
  298. headers={"Authorization": f"Bearer {token}"},
  299. )
  300. assert response.status_code == 200
  301. assert response.json()["auth_enabled"] is False
  302. # Verify auth is now disabled
  303. status_response = await async_client.get("/api/v1/auth/status")
  304. assert status_response.json()["auth_enabled"] is False