test_auth_api.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969
  1. """Integration tests for Authentication API endpoints.
  2. Tests the full request/response cycle for /api/v1/auth/ and /api/v1/users/ endpoints.
  3. """
  4. import pytest
  5. from httpx import AsyncClient
  6. class TestAuthStatusAPI:
  7. """Integration tests for /api/v1/auth/status endpoint."""
  8. @pytest.mark.asyncio
  9. @pytest.mark.integration
  10. async def test_get_auth_status_disabled(self, async_client: AsyncClient):
  11. """Verify auth status returns disabled when not configured."""
  12. response = await async_client.get("/api/v1/auth/status")
  13. assert response.status_code == 200
  14. result = response.json()
  15. assert "auth_enabled" in result
  16. assert result["auth_enabled"] is False
  17. assert result["requires_setup"] is True
  18. class TestAuthSetupAPI:
  19. """Integration tests for /api/v1/auth/setup endpoint."""
  20. @pytest.mark.asyncio
  21. @pytest.mark.integration
  22. async def test_setup_auth_disabled(self, async_client: AsyncClient):
  23. """Verify auth can be set up with auth disabled (no password required)."""
  24. response = await async_client.post(
  25. "/api/v1/auth/setup",
  26. json={"auth_enabled": False},
  27. )
  28. assert response.status_code == 200
  29. result = response.json()
  30. assert result["auth_enabled"] is False
  31. assert result["admin_created"] is False
  32. @pytest.mark.asyncio
  33. @pytest.mark.integration
  34. async def test_setup_auth_enabled_requires_credentials(self, async_client: AsyncClient):
  35. """Verify enabling auth requires admin username and password."""
  36. response = await async_client.post(
  37. "/api/v1/auth/setup",
  38. json={"auth_enabled": True},
  39. )
  40. assert response.status_code == 400
  41. assert "Admin username and password are required" in response.json()["detail"]
  42. @pytest.mark.asyncio
  43. @pytest.mark.integration
  44. async def test_setup_auth_enabled_with_credentials(self, async_client: AsyncClient):
  45. """Verify auth can be enabled with admin credentials."""
  46. response = await async_client.post(
  47. "/api/v1/auth/setup",
  48. json={
  49. "auth_enabled": True,
  50. "admin_username": "testadmin",
  51. "admin_password": "TestPass1!",
  52. },
  53. )
  54. assert response.status_code == 200
  55. result = response.json()
  56. assert result["auth_enabled"] is True
  57. assert result["admin_created"] is True
  58. @pytest.mark.asyncio
  59. @pytest.mark.integration
  60. async def test_setup_weak_password_rejected_when_creating_new_admin(self, async_client: AsyncClient):
  61. """Complexity is enforced only when a new admin is being created."""
  62. response = await async_client.post(
  63. "/api/v1/auth/setup",
  64. json={
  65. "auth_enabled": True,
  66. "admin_username": "weakpw_admin",
  67. "admin_password": "NoSpecial1",
  68. },
  69. )
  70. assert response.status_code == 400
  71. assert "special character" in response.json()["detail"].lower()
  72. @pytest.mark.asyncio
  73. @pytest.mark.integration
  74. async def test_setup_reenable_with_existing_admin_ignores_password(self, async_client: AsyncClient, db_session):
  75. """Re-enabling auth when an admin already exists must not reject the placeholder
  76. password the frontend still sends. Regression for the LDAP re-enable flow that
  77. previously 422'd because the Pydantic schema enforced complexity unconditionally.
  78. """
  79. from backend.app.core.auth import get_password_hash
  80. from backend.app.models.user import User
  81. existing = User(
  82. username="existing_admin",
  83. # pragma: allowlist secret — test fixture only, not a real credential
  84. password_hash=get_password_hash("DoesNotMatter1!"), # noqa: S106
  85. role="admin",
  86. is_active=True,
  87. )
  88. db_session.add(existing)
  89. await db_session.commit()
  90. response = await async_client.post(
  91. "/api/v1/auth/setup",
  92. json={
  93. "auth_enabled": True,
  94. "admin_username": "irrelevant",
  95. "admin_password": "NoSpecial1",
  96. },
  97. )
  98. assert response.status_code == 200
  99. result = response.json()
  100. assert result["auth_enabled"] is True
  101. assert result["admin_created"] is False
  102. class TestAuthLoginAPI:
  103. """Integration tests for /api/v1/auth/login endpoint."""
  104. @pytest.mark.asyncio
  105. @pytest.mark.integration
  106. async def test_login_auth_disabled(self, async_client: AsyncClient):
  107. """Verify login fails when auth is not enabled."""
  108. response = await async_client.post(
  109. "/api/v1/auth/login",
  110. json={"username": "admin", "password": "password"},
  111. )
  112. assert response.status_code == 400
  113. assert "Authentication is not enabled" in response.json()["detail"]
  114. @pytest.mark.asyncio
  115. @pytest.mark.integration
  116. async def test_login_success(self, async_client: AsyncClient):
  117. """Verify login succeeds with valid credentials after setup."""
  118. # First enable auth
  119. await async_client.post(
  120. "/api/v1/auth/setup",
  121. json={
  122. "auth_enabled": True,
  123. "admin_username": "logintest",
  124. "admin_password": "LoginPass1!",
  125. },
  126. )
  127. # Now login
  128. response = await async_client.post(
  129. "/api/v1/auth/login",
  130. json={"username": "logintest", "password": "LoginPass1!"},
  131. )
  132. assert response.status_code == 200
  133. result = response.json()
  134. assert "access_token" in result
  135. assert result["token_type"] == "bearer"
  136. assert result["user"]["username"] == "logintest"
  137. assert result["user"]["role"] == "admin"
  138. @pytest.mark.asyncio
  139. @pytest.mark.integration
  140. async def test_login_invalid_credentials(self, async_client: AsyncClient):
  141. """Verify login fails with invalid credentials."""
  142. # First enable auth
  143. await async_client.post(
  144. "/api/v1/auth/setup",
  145. json={
  146. "auth_enabled": True,
  147. "admin_username": "invalidtest",
  148. "admin_password": "CorrectPass1!",
  149. },
  150. )
  151. # Try login with wrong password
  152. response = await async_client.post(
  153. "/api/v1/auth/login",
  154. json={"username": "invalidtest", "password": "wrongpassword"},
  155. )
  156. assert response.status_code == 401
  157. assert "Incorrect username or password" in response.json()["detail"]
  158. class TestAuthMeAPI:
  159. """Integration tests for /api/v1/auth/me endpoint."""
  160. @pytest.mark.asyncio
  161. @pytest.mark.integration
  162. async def test_me_without_token(self, async_client: AsyncClient):
  163. """Verify /me fails without authentication token."""
  164. response = await async_client.get("/api/v1/auth/me")
  165. assert response.status_code == 401
  166. @pytest.mark.asyncio
  167. @pytest.mark.integration
  168. async def test_me_with_valid_token(self, async_client: AsyncClient):
  169. """Verify /me returns user info with valid token."""
  170. # Setup and login
  171. await async_client.post(
  172. "/api/v1/auth/setup",
  173. json={
  174. "auth_enabled": True,
  175. "admin_username": "metest",
  176. "admin_password": "MePass1!",
  177. },
  178. )
  179. login_response = await async_client.post(
  180. "/api/v1/auth/login",
  181. json={"username": "metest", "password": "MePass1!"},
  182. )
  183. token = login_response.json()["access_token"]
  184. # Get current user
  185. response = await async_client.get(
  186. "/api/v1/auth/me",
  187. headers={"Authorization": f"Bearer {token}"},
  188. )
  189. assert response.status_code == 200
  190. result = response.json()
  191. assert result["username"] == "metest"
  192. assert result["role"] == "admin"
  193. assert result["is_active"] is True
  194. @pytest.mark.asyncio
  195. @pytest.mark.integration
  196. async def test_me_with_api_key_bearer(self, async_client: AsyncClient, db_session):
  197. """Verify /me returns synthetic admin user when using API key via Bearer token."""
  198. from backend.app.core.auth import generate_api_key
  199. from backend.app.models.api_key import APIKey
  200. # Create an API key directly in the database
  201. full_key, key_hash, key_prefix = generate_api_key()
  202. api_key = APIKey(name="test-kiosk", key_hash=key_hash, key_prefix=key_prefix, enabled=True)
  203. db_session.add(api_key)
  204. await db_session.commit()
  205. # Call /me with the API key as Bearer token
  206. response = await async_client.get(
  207. "/api/v1/auth/me",
  208. headers={"Authorization": f"Bearer {full_key}"},
  209. )
  210. assert response.status_code == 200
  211. result = response.json()
  212. assert result["id"] == 0
  213. assert result["username"].startswith("api-key:")
  214. assert result["role"] == "admin"
  215. assert result["is_admin"] is True
  216. assert result["is_active"] is True
  217. assert len(result["permissions"]) > 0
  218. @pytest.mark.asyncio
  219. @pytest.mark.integration
  220. async def test_me_with_api_key_header(self, async_client: AsyncClient, db_session):
  221. """Verify /me returns synthetic admin user when using X-API-Key header."""
  222. from backend.app.core.auth import generate_api_key
  223. from backend.app.models.api_key import APIKey
  224. full_key, key_hash, key_prefix = generate_api_key()
  225. api_key = APIKey(name="test-kiosk-header", key_hash=key_hash, key_prefix=key_prefix, enabled=True)
  226. db_session.add(api_key)
  227. await db_session.commit()
  228. response = await async_client.get(
  229. "/api/v1/auth/me",
  230. headers={"X-API-Key": full_key},
  231. )
  232. assert response.status_code == 200
  233. result = response.json()
  234. assert result["id"] == 0
  235. assert result["username"].startswith("api-key:")
  236. assert result["is_admin"] is True
  237. @pytest.mark.asyncio
  238. @pytest.mark.integration
  239. async def test_me_with_invalid_api_key(self, async_client: AsyncClient):
  240. """Verify /me rejects invalid API key."""
  241. response = await async_client.get(
  242. "/api/v1/auth/me",
  243. headers={"Authorization": "Bearer bb_invalid_key_value"},
  244. )
  245. assert response.status_code == 401
  246. class TestUsersAPI:
  247. """Integration tests for /api/v1/users/ endpoints."""
  248. @pytest.fixture
  249. async def auth_token(self, async_client: AsyncClient):
  250. """Setup auth and return admin token."""
  251. await async_client.post(
  252. "/api/v1/auth/setup",
  253. json={
  254. "auth_enabled": True,
  255. "admin_username": "usersadmin",
  256. "admin_password": "AdminPass1!",
  257. },
  258. )
  259. login_response = await async_client.post(
  260. "/api/v1/auth/login",
  261. json={"username": "usersadmin", "password": "AdminPass1!"},
  262. )
  263. return login_response.json()["access_token"]
  264. @pytest.mark.asyncio
  265. @pytest.mark.integration
  266. async def test_list_users_requires_auth(self, async_client: AsyncClient):
  267. """Verify listing users requires authentication when auth is enabled."""
  268. # First enable auth
  269. await async_client.post(
  270. "/api/v1/auth/setup",
  271. json={
  272. "auth_enabled": True,
  273. "admin_username": "authreqadmin",
  274. "admin_password": "AdminPass1!",
  275. },
  276. )
  277. # Now try to list users without a token
  278. response = await async_client.get("/api/v1/users/")
  279. assert response.status_code == 401
  280. @pytest.mark.asyncio
  281. @pytest.mark.integration
  282. async def test_list_users_as_admin(self, async_client: AsyncClient, auth_token: str):
  283. """Verify admin can list users."""
  284. response = await async_client.get(
  285. "/api/v1/users/",
  286. headers={"Authorization": f"Bearer {auth_token}"},
  287. )
  288. assert response.status_code == 200
  289. result = response.json()
  290. assert isinstance(result, list)
  291. assert len(result) >= 1 # At least the admin user
  292. @pytest.mark.asyncio
  293. @pytest.mark.integration
  294. async def test_create_user(self, async_client: AsyncClient, auth_token: str):
  295. """Verify admin can create a new user."""
  296. response = await async_client.post(
  297. "/api/v1/users/",
  298. headers={"Authorization": f"Bearer {auth_token}"},
  299. json={
  300. "username": "newuser",
  301. "password": "Newuserpass1!",
  302. "role": "user",
  303. },
  304. )
  305. assert response.status_code == 201
  306. result = response.json()
  307. assert result["username"] == "newuser"
  308. assert result["role"] == "user"
  309. assert result["is_active"] is True
  310. @pytest.mark.asyncio
  311. @pytest.mark.integration
  312. async def test_create_user_duplicate_username(self, async_client: AsyncClient, auth_token: str):
  313. """Verify creating user with duplicate username fails."""
  314. # Create first user
  315. await async_client.post(
  316. "/api/v1/users/",
  317. headers={"Authorization": f"Bearer {auth_token}"},
  318. json={
  319. "username": "duplicateuser",
  320. "password": "Password123!",
  321. "role": "user",
  322. },
  323. )
  324. # Try to create duplicate
  325. response = await async_client.post(
  326. "/api/v1/users/",
  327. headers={"Authorization": f"Bearer {auth_token}"},
  328. json={
  329. "username": "duplicateuser",
  330. "password": "Password456!",
  331. "role": "user",
  332. },
  333. )
  334. assert response.status_code == 400
  335. assert "Username already exists" in response.json()["detail"]
  336. @pytest.mark.asyncio
  337. @pytest.mark.integration
  338. async def test_update_user(self, async_client: AsyncClient, auth_token: str):
  339. """Verify admin can update a user."""
  340. # Create user
  341. create_response = await async_client.post(
  342. "/api/v1/users/",
  343. headers={"Authorization": f"Bearer {auth_token}"},
  344. json={
  345. "username": "updateuser",
  346. "password": "Password123!",
  347. "role": "user",
  348. },
  349. )
  350. user_id = create_response.json()["id"]
  351. # Update user
  352. response = await async_client.patch(
  353. f"/api/v1/users/{user_id}",
  354. headers={"Authorization": f"Bearer {auth_token}"},
  355. json={"role": "admin"},
  356. )
  357. assert response.status_code == 200
  358. assert response.json()["role"] == "admin"
  359. @pytest.mark.asyncio
  360. @pytest.mark.integration
  361. async def test_delete_user(self, async_client: AsyncClient, auth_token: str):
  362. """Verify admin can delete a user."""
  363. # Create user
  364. create_response = await async_client.post(
  365. "/api/v1/users/",
  366. headers={"Authorization": f"Bearer {auth_token}"},
  367. json={
  368. "username": "deleteuser",
  369. "password": "Password123!",
  370. "role": "user",
  371. },
  372. )
  373. user_id = create_response.json()["id"]
  374. # Delete user
  375. response = await async_client.delete(
  376. f"/api/v1/users/{user_id}",
  377. headers={"Authorization": f"Bearer {auth_token}"},
  378. )
  379. assert response.status_code == 204
  380. class TestAuthDisableAPI:
  381. """Integration tests for /api/v1/auth/disable endpoint."""
  382. @pytest.mark.asyncio
  383. @pytest.mark.integration
  384. async def test_disable_auth(self, async_client: AsyncClient):
  385. """Verify admin can disable authentication."""
  386. # Setup auth
  387. await async_client.post(
  388. "/api/v1/auth/setup",
  389. json={
  390. "auth_enabled": True,
  391. "admin_username": "disableadmin",
  392. "admin_password": "AdminPass1!",
  393. },
  394. )
  395. # Login to get token
  396. login_response = await async_client.post(
  397. "/api/v1/auth/login",
  398. json={"username": "disableadmin", "password": "AdminPass1!"},
  399. )
  400. token = login_response.json()["access_token"]
  401. # Disable auth
  402. response = await async_client.post(
  403. "/api/v1/auth/disable",
  404. headers={"Authorization": f"Bearer {token}"},
  405. )
  406. assert response.status_code == 200
  407. assert response.json()["auth_enabled"] is False
  408. # Verify auth is now disabled
  409. status_response = await async_client.get("/api/v1/auth/status")
  410. assert status_response.json()["auth_enabled"] is False
  411. class TestGroupsAPI:
  412. """Integration tests for /api/v1/groups/ endpoints."""
  413. @pytest.fixture
  414. async def auth_token(self, async_client: AsyncClient):
  415. """Setup auth and return admin token."""
  416. await async_client.post(
  417. "/api/v1/auth/setup",
  418. json={
  419. "auth_enabled": True,
  420. "admin_username": "groupsadmin",
  421. "admin_password": "AdminPass1!",
  422. },
  423. )
  424. login_response = await async_client.post(
  425. "/api/v1/auth/login",
  426. json={"username": "groupsadmin", "password": "AdminPass1!"},
  427. )
  428. return login_response.json()["access_token"]
  429. @pytest.mark.asyncio
  430. @pytest.mark.integration
  431. async def test_list_groups(self, async_client: AsyncClient, auth_token: str):
  432. """Verify listing groups returns default groups."""
  433. response = await async_client.get(
  434. "/api/v1/groups/",
  435. headers={"Authorization": f"Bearer {auth_token}"},
  436. )
  437. assert response.status_code == 200
  438. groups = response.json()
  439. assert isinstance(groups, list)
  440. # Should have default groups: Administrators, Operators, Viewers
  441. group_names = [g["name"] for g in groups]
  442. assert "Administrators" in group_names
  443. assert "Operators" in group_names
  444. assert "Viewers" in group_names
  445. @pytest.mark.asyncio
  446. @pytest.mark.integration
  447. async def test_get_permissions(self, async_client: AsyncClient, auth_token: str):
  448. """Verify getting available permissions."""
  449. response = await async_client.get(
  450. "/api/v1/groups/permissions",
  451. headers={"Authorization": f"Bearer {auth_token}"},
  452. )
  453. assert response.status_code == 200
  454. permissions = response.json()
  455. assert isinstance(permissions, dict)
  456. # Should have permission categories
  457. assert "Printers" in permissions or len(permissions) > 0
  458. @pytest.mark.asyncio
  459. @pytest.mark.integration
  460. async def test_create_group(self, async_client: AsyncClient, auth_token: str):
  461. """Verify creating a new group."""
  462. response = await async_client.post(
  463. "/api/v1/groups/",
  464. headers={"Authorization": f"Bearer {auth_token}"},
  465. json={
  466. "name": "Custom Group",
  467. "description": "A custom test group",
  468. "permissions": ["printers:read", "archives:read"],
  469. },
  470. )
  471. assert response.status_code == 201
  472. group = response.json()
  473. assert group["name"] == "Custom Group"
  474. assert group["description"] == "A custom test group"
  475. assert "printers:read" in group["permissions"]
  476. assert group["is_system"] is False
  477. @pytest.mark.asyncio
  478. @pytest.mark.integration
  479. async def test_update_group(self, async_client: AsyncClient, auth_token: str):
  480. """Verify updating a group."""
  481. # Create a group first
  482. create_response = await async_client.post(
  483. "/api/v1/groups/",
  484. headers={"Authorization": f"Bearer {auth_token}"},
  485. json={
  486. "name": "Update Test Group",
  487. "permissions": ["printers:read"],
  488. },
  489. )
  490. group_id = create_response.json()["id"]
  491. # Update the group
  492. response = await async_client.patch(
  493. f"/api/v1/groups/{group_id}",
  494. headers={"Authorization": f"Bearer {auth_token}"},
  495. json={
  496. "description": "Updated description",
  497. "permissions": ["printers:read", "printers:control"],
  498. },
  499. )
  500. assert response.status_code == 200
  501. group = response.json()
  502. assert group["description"] == "Updated description"
  503. assert "printers:control" in group["permissions"]
  504. @pytest.mark.asyncio
  505. @pytest.mark.integration
  506. async def test_cannot_delete_system_group(self, async_client: AsyncClient, auth_token: str):
  507. """Verify system groups cannot be deleted."""
  508. # Get the Administrators group
  509. list_response = await async_client.get(
  510. "/api/v1/groups/",
  511. headers={"Authorization": f"Bearer {auth_token}"},
  512. )
  513. admin_group = next(g for g in list_response.json() if g["name"] == "Administrators")
  514. # Try to delete it
  515. response = await async_client.delete(
  516. f"/api/v1/groups/{admin_group['id']}",
  517. headers={"Authorization": f"Bearer {auth_token}"},
  518. )
  519. assert response.status_code == 400
  520. assert "system group" in response.json()["detail"].lower()
  521. @pytest.mark.asyncio
  522. @pytest.mark.integration
  523. async def test_delete_custom_group(self, async_client: AsyncClient, auth_token: str):
  524. """Verify custom groups can be deleted."""
  525. # Create a group
  526. create_response = await async_client.post(
  527. "/api/v1/groups/",
  528. headers={"Authorization": f"Bearer {auth_token}"},
  529. json={"name": "Delete Test Group"},
  530. )
  531. group_id = create_response.json()["id"]
  532. # Delete it
  533. response = await async_client.delete(
  534. f"/api/v1/groups/{group_id}",
  535. headers={"Authorization": f"Bearer {auth_token}"},
  536. )
  537. assert response.status_code == 204
  538. class TestUserGroupsAPI:
  539. """Integration tests for user-group assignments."""
  540. @pytest.fixture
  541. async def auth_token(self, async_client: AsyncClient):
  542. """Setup auth and return admin token."""
  543. await async_client.post(
  544. "/api/v1/auth/setup",
  545. json={
  546. "auth_enabled": True,
  547. "admin_username": "usergroupadmin",
  548. "admin_password": "AdminPass1!",
  549. },
  550. )
  551. login_response = await async_client.post(
  552. "/api/v1/auth/login",
  553. json={"username": "usergroupadmin", "password": "AdminPass1!"},
  554. )
  555. return login_response.json()["access_token"]
  556. @pytest.mark.asyncio
  557. @pytest.mark.integration
  558. async def test_create_user_with_groups(self, async_client: AsyncClient, auth_token: str):
  559. """Verify creating a user with group assignments."""
  560. # Get Operators group ID
  561. groups_response = await async_client.get(
  562. "/api/v1/groups/",
  563. headers={"Authorization": f"Bearer {auth_token}"},
  564. )
  565. operators_group = next(g for g in groups_response.json() if g["name"] == "Operators")
  566. # Create user with group
  567. response = await async_client.post(
  568. "/api/v1/users/",
  569. headers={"Authorization": f"Bearer {auth_token}"},
  570. json={
  571. "username": "groupuser",
  572. "password": "Password123!",
  573. "group_ids": [operators_group["id"]],
  574. },
  575. )
  576. assert response.status_code == 201
  577. user = response.json()
  578. assert any(g["name"] == "Operators" for g in user["groups"])
  579. @pytest.mark.asyncio
  580. @pytest.mark.integration
  581. async def test_add_user_to_group(self, async_client: AsyncClient, auth_token: str):
  582. """Verify adding a user to a group."""
  583. # Create a user
  584. user_response = await async_client.post(
  585. "/api/v1/users/",
  586. headers={"Authorization": f"Bearer {auth_token}"},
  587. json={"username": "addtogroup", "password": "Password123!"},
  588. )
  589. user_id = user_response.json()["id"]
  590. # Get Viewers group
  591. groups_response = await async_client.get(
  592. "/api/v1/groups/",
  593. headers={"Authorization": f"Bearer {auth_token}"},
  594. )
  595. viewers_group = next(g for g in groups_response.json() if g["name"] == "Viewers")
  596. # Add user to group
  597. response = await async_client.post(
  598. f"/api/v1/groups/{viewers_group['id']}/users/{user_id}",
  599. headers={"Authorization": f"Bearer {auth_token}"},
  600. )
  601. assert response.status_code == 204
  602. # Verify user is in group
  603. user_check = await async_client.get(
  604. f"/api/v1/users/{user_id}",
  605. headers={"Authorization": f"Bearer {auth_token}"},
  606. )
  607. assert any(g["name"] == "Viewers" for g in user_check.json()["groups"])
  608. class TestChangePasswordAPI:
  609. """Integration tests for /api/v1/users/me/change-password endpoint."""
  610. @pytest.fixture
  611. async def user_token(self, async_client: AsyncClient):
  612. """Setup auth and return regular user token."""
  613. # Enable auth with admin
  614. await async_client.post(
  615. "/api/v1/auth/setup",
  616. json={
  617. "auth_enabled": True,
  618. "admin_username": "pwchangeadmin",
  619. "admin_password": "AdminPass1!",
  620. },
  621. )
  622. admin_login = await async_client.post(
  623. "/api/v1/auth/login",
  624. json={"username": "pwchangeadmin", "password": "AdminPass1!"},
  625. )
  626. admin_token = admin_login.json()["access_token"]
  627. # Create a regular user
  628. await async_client.post(
  629. "/api/v1/users/",
  630. headers={"Authorization": f"Bearer {admin_token}"},
  631. json={"username": "pwchangeuser", "password": "Oldpassword123!"},
  632. )
  633. # Login as regular user
  634. user_login = await async_client.post(
  635. "/api/v1/auth/login",
  636. json={"username": "pwchangeuser", "password": "Oldpassword123!"},
  637. )
  638. return user_login.json()["access_token"]
  639. @pytest.mark.asyncio
  640. @pytest.mark.integration
  641. async def test_change_password_success(self, async_client: AsyncClient, user_token: str):
  642. """Verify user can change their own password."""
  643. response = await async_client.post(
  644. "/api/v1/users/me/change-password",
  645. headers={"Authorization": f"Bearer {user_token}"},
  646. json={
  647. "current_password": "Oldpassword123!",
  648. "new_password": "Newpassword456!",
  649. },
  650. )
  651. assert response.status_code == 200
  652. assert "success" in response.json()["message"].lower()
  653. # Verify can login with new password
  654. login_response = await async_client.post(
  655. "/api/v1/auth/login",
  656. json={"username": "pwchangeuser", "password": "Newpassword456!"},
  657. )
  658. assert login_response.status_code == 200
  659. @pytest.mark.asyncio
  660. @pytest.mark.integration
  661. async def test_change_password_wrong_current(self, async_client: AsyncClient, user_token: str):
  662. """Verify changing password fails with wrong current password."""
  663. response = await async_client.post(
  664. "/api/v1/users/me/change-password",
  665. headers={"Authorization": f"Bearer {user_token}"},
  666. json={
  667. "current_password": "wrongpassword",
  668. "new_password": "Newpassword456!",
  669. },
  670. )
  671. assert response.status_code == 400
  672. assert "incorrect" in response.json()["detail"].lower()
  673. @pytest.mark.asyncio
  674. @pytest.mark.integration
  675. async def test_change_password_requires_auth(self, async_client: AsyncClient):
  676. """Verify changing password requires authentication."""
  677. response = await async_client.post(
  678. "/api/v1/users/me/change-password",
  679. json={
  680. "current_password": "oldpassword",
  681. "new_password": "Strongpass456!",
  682. },
  683. )
  684. assert response.status_code == 401
  685. class TestAuthMiddlewarePublicRoutes:
  686. """Tests for auth middleware public route configuration.
  687. These routes must be accessible without authentication, even when auth is enabled,
  688. because browser elements like <img src> and <video src> don't send Authorization headers.
  689. """
  690. @pytest.fixture
  691. async def enabled_auth(self, async_client: AsyncClient):
  692. """Enable auth for testing middleware behavior."""
  693. await async_client.post(
  694. "/api/v1/auth/setup",
  695. json={
  696. "auth_enabled": True,
  697. "admin_username": "middlewareadmin",
  698. "admin_password": "AdminPass1!",
  699. },
  700. )
  701. @pytest.mark.asyncio
  702. @pytest.mark.integration
  703. async def test_auth_status_is_public(self, async_client: AsyncClient, enabled_auth):
  704. """Verify /api/v1/auth/status is accessible without auth."""
  705. response = await async_client.get("/api/v1/auth/status")
  706. assert response.status_code == 200
  707. assert "auth_enabled" in response.json()
  708. @pytest.mark.asyncio
  709. @pytest.mark.integration
  710. async def test_auth_login_is_public(self, async_client: AsyncClient, enabled_auth):
  711. """Verify /api/v1/auth/login is accessible without auth."""
  712. response = await async_client.post(
  713. "/api/v1/auth/login",
  714. json={"username": "middlewareadmin", "password": "AdminPass1!"},
  715. )
  716. # Should not return 401 (unauthorized) - it should either succeed or return
  717. # a different error (like 400 for wrong credentials)
  718. assert response.status_code != 401 or "token" in response.json()
  719. @pytest.mark.asyncio
  720. @pytest.mark.integration
  721. async def test_auth_setup_is_public(self, async_client: AsyncClient):
  722. """Verify /api/v1/auth/setup is accessible without auth (needed for setup/recovery)."""
  723. # Don't enable auth first - test that setup endpoint itself is accessible
  724. response = await async_client.post(
  725. "/api/v1/auth/setup",
  726. json={"auth_enabled": False},
  727. )
  728. # Should not be 401
  729. assert response.status_code != 401
  730. @pytest.mark.asyncio
  731. @pytest.mark.integration
  732. async def test_updates_version_is_public(self, async_client: AsyncClient, enabled_auth):
  733. """Verify /api/v1/updates/version is accessible without auth."""
  734. response = await async_client.get("/api/v1/updates/version")
  735. # Should not be 401
  736. assert response.status_code != 401
  737. @pytest.mark.asyncio
  738. @pytest.mark.integration
  739. async def test_protected_route_requires_auth(self, async_client: AsyncClient, enabled_auth):
  740. """Verify non-public routes return 401 without token."""
  741. response = await async_client.get("/api/v1/printers/")
  742. assert response.status_code == 401
  743. @pytest.mark.asyncio
  744. @pytest.mark.integration
  745. async def test_protected_route_works_with_token(self, async_client: AsyncClient, enabled_auth):
  746. """Verify non-public routes work with valid token."""
  747. # Login to get token
  748. login_response = await async_client.post(
  749. "/api/v1/auth/login",
  750. json={"username": "middlewareadmin", "password": "AdminPass1!"},
  751. )
  752. token = login_response.json()["access_token"]
  753. # Access protected route
  754. response = await async_client.get(
  755. "/api/v1/printers/",
  756. headers={"Authorization": f"Bearer {token}"},
  757. )
  758. assert response.status_code == 200
  759. @pytest.mark.asyncio
  760. @pytest.mark.integration
  761. async def test_advanced_auth_status_is_public(self, async_client: AsyncClient, enabled_auth):
  762. """Verify /api/v1/auth/advanced-auth/status is accessible without auth."""
  763. response = await async_client.get("/api/v1/auth/advanced-auth/status")
  764. # Should not be 401 (must be accessible for login page)
  765. assert response.status_code != 401
  766. # Should return valid response (200 with auth status)
  767. if response.status_code == 200:
  768. result = response.json()
  769. assert "advanced_auth_enabled" in result
  770. assert "smtp_configured" in result
  771. @pytest.mark.asyncio
  772. @pytest.mark.integration
  773. async def test_forgot_password_is_public(self, async_client: AsyncClient, enabled_auth):
  774. """Verify /api/v1/auth/forgot-password is accessible without auth."""
  775. response = await async_client.post(
  776. "/api/v1/auth/forgot-password",
  777. json={"email": "test@example.com"},
  778. )
  779. # Should not be 401 (must be accessible for password reset from login page)
  780. assert response.status_code != 401
  781. # Will likely be 400 (advanced auth not enabled) but that's okay -
  782. # the important thing is it's not blocked by auth middleware
  783. assert response.status_code in [200, 400]
  784. # ===========================================================================
  785. # H-1: Input length validation
  786. # ===========================================================================
  787. class TestInputLengthValidation:
  788. """LoginRequest and SetupRequest must reject oversized inputs (H-1)."""
  789. @pytest.mark.asyncio
  790. @pytest.mark.integration
  791. async def test_login_password_too_long_rejected(self, async_client: AsyncClient):
  792. """Password exceeding 256 characters must be rejected with 422."""
  793. response = await async_client.post(
  794. "/api/v1/auth/login",
  795. json={"username": "admin", "password": "x" * 257},
  796. )
  797. assert response.status_code == 422
  798. @pytest.mark.asyncio
  799. @pytest.mark.integration
  800. async def test_login_username_too_long_rejected(self, async_client: AsyncClient):
  801. """Username exceeding 150 characters must be rejected with 422."""
  802. response = await async_client.post(
  803. "/api/v1/auth/login",
  804. json={"username": "u" * 151, "password": "password"},
  805. )
  806. assert response.status_code == 422
  807. @pytest.mark.asyncio
  808. @pytest.mark.integration
  809. async def test_setup_password_too_long_rejected(self, async_client: AsyncClient):
  810. """SetupRequest admin_password exceeding 256 characters must be rejected with 422."""
  811. response = await async_client.post(
  812. "/api/v1/auth/setup",
  813. json={
  814. "auth_enabled": True,
  815. "admin_username": "admin",
  816. "admin_password": "x" * 257,
  817. },
  818. )
  819. assert response.status_code == 422
  820. @pytest.mark.asyncio
  821. @pytest.mark.integration
  822. async def test_login_password_at_limit_accepted(self, async_client: AsyncClient):
  823. """Password of exactly 256 characters must pass schema validation (may fail auth)."""
  824. response = await async_client.post(
  825. "/api/v1/auth/login",
  826. json={"username": "admin", "password": "x" * 256},
  827. )
  828. # Schema accepts it; auth may reject with 401 (auth disabled) or 400
  829. assert response.status_code != 422