test_auth_api.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968
  1. """Integration tests for Authentication API endpoints.
  2. Tests the full request/response cycle for /api/v1/auth/ and /api/v1/users/ endpoints.
  3. """
  4. import pytest
  5. from httpx import AsyncClient
  6. class TestAuthStatusAPI:
  7. """Integration tests for /api/v1/auth/status endpoint."""
  8. @pytest.mark.asyncio
  9. @pytest.mark.integration
  10. async def test_get_auth_status_disabled(self, async_client: AsyncClient):
  11. """Verify auth status returns disabled when not configured."""
  12. response = await async_client.get("/api/v1/auth/status")
  13. assert response.status_code == 200
  14. result = response.json()
  15. assert "auth_enabled" in result
  16. assert result["auth_enabled"] is False
  17. assert result["requires_setup"] is True
  18. class TestAuthSetupAPI:
  19. """Integration tests for /api/v1/auth/setup endpoint."""
  20. @pytest.mark.asyncio
  21. @pytest.mark.integration
  22. async def test_setup_auth_disabled(self, async_client: AsyncClient):
  23. """Verify auth can be set up with auth disabled (no password required)."""
  24. response = await async_client.post(
  25. "/api/v1/auth/setup",
  26. json={"auth_enabled": False},
  27. )
  28. assert response.status_code == 200
  29. result = response.json()
  30. assert result["auth_enabled"] is False
  31. assert result["admin_created"] is False
  32. @pytest.mark.asyncio
  33. @pytest.mark.integration
  34. async def test_setup_auth_enabled_requires_credentials(self, async_client: AsyncClient):
  35. """Verify enabling auth requires admin username and password."""
  36. response = await async_client.post(
  37. "/api/v1/auth/setup",
  38. json={"auth_enabled": True},
  39. )
  40. assert response.status_code == 400
  41. assert "Admin username and password are required" in response.json()["detail"]
  42. @pytest.mark.asyncio
  43. @pytest.mark.integration
  44. async def test_setup_auth_enabled_with_credentials(self, async_client: AsyncClient):
  45. """Verify auth can be enabled with admin credentials."""
  46. response = await async_client.post(
  47. "/api/v1/auth/setup",
  48. json={
  49. "auth_enabled": True,
  50. "admin_username": "testadmin",
  51. "admin_password": "TestPass1!",
  52. },
  53. )
  54. assert response.status_code == 200
  55. result = response.json()
  56. assert result["auth_enabled"] is True
  57. assert result["admin_created"] is True
  58. @pytest.mark.asyncio
  59. @pytest.mark.integration
  60. async def test_setup_weak_password_rejected_when_creating_new_admin(self, async_client: AsyncClient):
  61. """Complexity is enforced only when a new admin is being created."""
  62. response = await async_client.post(
  63. "/api/v1/auth/setup",
  64. json={
  65. "auth_enabled": True,
  66. "admin_username": "weakpw_admin",
  67. "admin_password": "NoSpecial1",
  68. },
  69. )
  70. assert response.status_code == 400
  71. assert "special character" in response.json()["detail"].lower()
  72. @pytest.mark.asyncio
  73. @pytest.mark.integration
  74. async def test_setup_reenable_with_existing_admin_ignores_password(self, async_client: AsyncClient, db_session):
  75. """Re-enabling auth when an admin already exists must not reject the placeholder
  76. password the frontend still sends. Regression for the LDAP re-enable flow that
  77. previously 422'd because the Pydantic schema enforced complexity unconditionally.
  78. """
  79. from backend.app.core.auth import get_password_hash
  80. from backend.app.models.user import User
  81. existing = User(
  82. username="existing_admin",
  83. password_hash=get_password_hash("DoesNotMatter1!"),
  84. role="admin",
  85. is_active=True,
  86. )
  87. db_session.add(existing)
  88. await db_session.commit()
  89. response = await async_client.post(
  90. "/api/v1/auth/setup",
  91. json={
  92. "auth_enabled": True,
  93. "admin_username": "irrelevant",
  94. "admin_password": "Ihk88LimT",
  95. },
  96. )
  97. assert response.status_code == 200
  98. result = response.json()
  99. assert result["auth_enabled"] is True
  100. assert result["admin_created"] is False
  101. class TestAuthLoginAPI:
  102. """Integration tests for /api/v1/auth/login endpoint."""
  103. @pytest.mark.asyncio
  104. @pytest.mark.integration
  105. async def test_login_auth_disabled(self, async_client: AsyncClient):
  106. """Verify login fails when auth is not enabled."""
  107. response = await async_client.post(
  108. "/api/v1/auth/login",
  109. json={"username": "admin", "password": "password"},
  110. )
  111. assert response.status_code == 400
  112. assert "Authentication is not enabled" in response.json()["detail"]
  113. @pytest.mark.asyncio
  114. @pytest.mark.integration
  115. async def test_login_success(self, async_client: AsyncClient):
  116. """Verify login succeeds with valid credentials after setup."""
  117. # First enable auth
  118. await async_client.post(
  119. "/api/v1/auth/setup",
  120. json={
  121. "auth_enabled": True,
  122. "admin_username": "logintest",
  123. "admin_password": "LoginPass1!",
  124. },
  125. )
  126. # Now login
  127. response = await async_client.post(
  128. "/api/v1/auth/login",
  129. json={"username": "logintest", "password": "LoginPass1!"},
  130. )
  131. assert response.status_code == 200
  132. result = response.json()
  133. assert "access_token" in result
  134. assert result["token_type"] == "bearer"
  135. assert result["user"]["username"] == "logintest"
  136. assert result["user"]["role"] == "admin"
  137. @pytest.mark.asyncio
  138. @pytest.mark.integration
  139. async def test_login_invalid_credentials(self, async_client: AsyncClient):
  140. """Verify login fails with invalid credentials."""
  141. # First enable auth
  142. await async_client.post(
  143. "/api/v1/auth/setup",
  144. json={
  145. "auth_enabled": True,
  146. "admin_username": "invalidtest",
  147. "admin_password": "CorrectPass1!",
  148. },
  149. )
  150. # Try login with wrong password
  151. response = await async_client.post(
  152. "/api/v1/auth/login",
  153. json={"username": "invalidtest", "password": "wrongpassword"},
  154. )
  155. assert response.status_code == 401
  156. assert "Incorrect username or password" in response.json()["detail"]
  157. class TestAuthMeAPI:
  158. """Integration tests for /api/v1/auth/me endpoint."""
  159. @pytest.mark.asyncio
  160. @pytest.mark.integration
  161. async def test_me_without_token(self, async_client: AsyncClient):
  162. """Verify /me fails without authentication token."""
  163. response = await async_client.get("/api/v1/auth/me")
  164. assert response.status_code == 401
  165. @pytest.mark.asyncio
  166. @pytest.mark.integration
  167. async def test_me_with_valid_token(self, async_client: AsyncClient):
  168. """Verify /me returns user info with valid token."""
  169. # Setup and login
  170. await async_client.post(
  171. "/api/v1/auth/setup",
  172. json={
  173. "auth_enabled": True,
  174. "admin_username": "metest",
  175. "admin_password": "MePass1!",
  176. },
  177. )
  178. login_response = await async_client.post(
  179. "/api/v1/auth/login",
  180. json={"username": "metest", "password": "MePass1!"},
  181. )
  182. token = login_response.json()["access_token"]
  183. # Get current user
  184. response = await async_client.get(
  185. "/api/v1/auth/me",
  186. headers={"Authorization": f"Bearer {token}"},
  187. )
  188. assert response.status_code == 200
  189. result = response.json()
  190. assert result["username"] == "metest"
  191. assert result["role"] == "admin"
  192. assert result["is_active"] is True
  193. @pytest.mark.asyncio
  194. @pytest.mark.integration
  195. async def test_me_with_api_key_bearer(self, async_client: AsyncClient, db_session):
  196. """Verify /me returns synthetic admin user when using API key via Bearer token."""
  197. from backend.app.core.auth import generate_api_key
  198. from backend.app.models.api_key import APIKey
  199. # Create an API key directly in the database
  200. full_key, key_hash, key_prefix = generate_api_key()
  201. api_key = APIKey(name="test-kiosk", key_hash=key_hash, key_prefix=key_prefix, enabled=True)
  202. db_session.add(api_key)
  203. await db_session.commit()
  204. # Call /me with the API key as Bearer token
  205. response = await async_client.get(
  206. "/api/v1/auth/me",
  207. headers={"Authorization": f"Bearer {full_key}"},
  208. )
  209. assert response.status_code == 200
  210. result = response.json()
  211. assert result["id"] == 0
  212. assert result["username"].startswith("api-key:")
  213. assert result["role"] == "admin"
  214. assert result["is_admin"] is True
  215. assert result["is_active"] is True
  216. assert len(result["permissions"]) > 0
  217. @pytest.mark.asyncio
  218. @pytest.mark.integration
  219. async def test_me_with_api_key_header(self, async_client: AsyncClient, db_session):
  220. """Verify /me returns synthetic admin user when using X-API-Key header."""
  221. from backend.app.core.auth import generate_api_key
  222. from backend.app.models.api_key import APIKey
  223. full_key, key_hash, key_prefix = generate_api_key()
  224. api_key = APIKey(name="test-kiosk-header", key_hash=key_hash, key_prefix=key_prefix, enabled=True)
  225. db_session.add(api_key)
  226. await db_session.commit()
  227. response = await async_client.get(
  228. "/api/v1/auth/me",
  229. headers={"X-API-Key": full_key},
  230. )
  231. assert response.status_code == 200
  232. result = response.json()
  233. assert result["id"] == 0
  234. assert result["username"].startswith("api-key:")
  235. assert result["is_admin"] is True
  236. @pytest.mark.asyncio
  237. @pytest.mark.integration
  238. async def test_me_with_invalid_api_key(self, async_client: AsyncClient):
  239. """Verify /me rejects invalid API key."""
  240. response = await async_client.get(
  241. "/api/v1/auth/me",
  242. headers={"Authorization": "Bearer bb_invalid_key_value"},
  243. )
  244. assert response.status_code == 401
  245. class TestUsersAPI:
  246. """Integration tests for /api/v1/users/ endpoints."""
  247. @pytest.fixture
  248. async def auth_token(self, async_client: AsyncClient):
  249. """Setup auth and return admin token."""
  250. await async_client.post(
  251. "/api/v1/auth/setup",
  252. json={
  253. "auth_enabled": True,
  254. "admin_username": "usersadmin",
  255. "admin_password": "AdminPass1!",
  256. },
  257. )
  258. login_response = await async_client.post(
  259. "/api/v1/auth/login",
  260. json={"username": "usersadmin", "password": "AdminPass1!"},
  261. )
  262. return login_response.json()["access_token"]
  263. @pytest.mark.asyncio
  264. @pytest.mark.integration
  265. async def test_list_users_requires_auth(self, async_client: AsyncClient):
  266. """Verify listing users requires authentication when auth is enabled."""
  267. # First enable auth
  268. await async_client.post(
  269. "/api/v1/auth/setup",
  270. json={
  271. "auth_enabled": True,
  272. "admin_username": "authreqadmin",
  273. "admin_password": "AdminPass1!",
  274. },
  275. )
  276. # Now try to list users without a token
  277. response = await async_client.get("/api/v1/users/")
  278. assert response.status_code == 401
  279. @pytest.mark.asyncio
  280. @pytest.mark.integration
  281. async def test_list_users_as_admin(self, async_client: AsyncClient, auth_token: str):
  282. """Verify admin can list users."""
  283. response = await async_client.get(
  284. "/api/v1/users/",
  285. headers={"Authorization": f"Bearer {auth_token}"},
  286. )
  287. assert response.status_code == 200
  288. result = response.json()
  289. assert isinstance(result, list)
  290. assert len(result) >= 1 # At least the admin user
  291. @pytest.mark.asyncio
  292. @pytest.mark.integration
  293. async def test_create_user(self, async_client: AsyncClient, auth_token: str):
  294. """Verify admin can create a new user."""
  295. response = await async_client.post(
  296. "/api/v1/users/",
  297. headers={"Authorization": f"Bearer {auth_token}"},
  298. json={
  299. "username": "newuser",
  300. "password": "Newuserpass1!",
  301. "role": "user",
  302. },
  303. )
  304. assert response.status_code == 201
  305. result = response.json()
  306. assert result["username"] == "newuser"
  307. assert result["role"] == "user"
  308. assert result["is_active"] is True
  309. @pytest.mark.asyncio
  310. @pytest.mark.integration
  311. async def test_create_user_duplicate_username(self, async_client: AsyncClient, auth_token: str):
  312. """Verify creating user with duplicate username fails."""
  313. # Create first user
  314. await async_client.post(
  315. "/api/v1/users/",
  316. headers={"Authorization": f"Bearer {auth_token}"},
  317. json={
  318. "username": "duplicateuser",
  319. "password": "Password123!",
  320. "role": "user",
  321. },
  322. )
  323. # Try to create duplicate
  324. response = await async_client.post(
  325. "/api/v1/users/",
  326. headers={"Authorization": f"Bearer {auth_token}"},
  327. json={
  328. "username": "duplicateuser",
  329. "password": "Password456!",
  330. "role": "user",
  331. },
  332. )
  333. assert response.status_code == 400
  334. assert "Username already exists" in response.json()["detail"]
  335. @pytest.mark.asyncio
  336. @pytest.mark.integration
  337. async def test_update_user(self, async_client: AsyncClient, auth_token: str):
  338. """Verify admin can update a user."""
  339. # Create user
  340. create_response = await async_client.post(
  341. "/api/v1/users/",
  342. headers={"Authorization": f"Bearer {auth_token}"},
  343. json={
  344. "username": "updateuser",
  345. "password": "Password123!",
  346. "role": "user",
  347. },
  348. )
  349. user_id = create_response.json()["id"]
  350. # Update user
  351. response = await async_client.patch(
  352. f"/api/v1/users/{user_id}",
  353. headers={"Authorization": f"Bearer {auth_token}"},
  354. json={"role": "admin"},
  355. )
  356. assert response.status_code == 200
  357. assert response.json()["role"] == "admin"
  358. @pytest.mark.asyncio
  359. @pytest.mark.integration
  360. async def test_delete_user(self, async_client: AsyncClient, auth_token: str):
  361. """Verify admin can delete a user."""
  362. # Create user
  363. create_response = await async_client.post(
  364. "/api/v1/users/",
  365. headers={"Authorization": f"Bearer {auth_token}"},
  366. json={
  367. "username": "deleteuser",
  368. "password": "Password123!",
  369. "role": "user",
  370. },
  371. )
  372. user_id = create_response.json()["id"]
  373. # Delete user
  374. response = await async_client.delete(
  375. f"/api/v1/users/{user_id}",
  376. headers={"Authorization": f"Bearer {auth_token}"},
  377. )
  378. assert response.status_code == 204
  379. class TestAuthDisableAPI:
  380. """Integration tests for /api/v1/auth/disable endpoint."""
  381. @pytest.mark.asyncio
  382. @pytest.mark.integration
  383. async def test_disable_auth(self, async_client: AsyncClient):
  384. """Verify admin can disable authentication."""
  385. # Setup auth
  386. await async_client.post(
  387. "/api/v1/auth/setup",
  388. json={
  389. "auth_enabled": True,
  390. "admin_username": "disableadmin",
  391. "admin_password": "AdminPass1!",
  392. },
  393. )
  394. # Login to get token
  395. login_response = await async_client.post(
  396. "/api/v1/auth/login",
  397. json={"username": "disableadmin", "password": "AdminPass1!"},
  398. )
  399. token = login_response.json()["access_token"]
  400. # Disable auth
  401. response = await async_client.post(
  402. "/api/v1/auth/disable",
  403. headers={"Authorization": f"Bearer {token}"},
  404. )
  405. assert response.status_code == 200
  406. assert response.json()["auth_enabled"] is False
  407. # Verify auth is now disabled
  408. status_response = await async_client.get("/api/v1/auth/status")
  409. assert status_response.json()["auth_enabled"] is False
  410. class TestGroupsAPI:
  411. """Integration tests for /api/v1/groups/ endpoints."""
  412. @pytest.fixture
  413. async def auth_token(self, async_client: AsyncClient):
  414. """Setup auth and return admin token."""
  415. await async_client.post(
  416. "/api/v1/auth/setup",
  417. json={
  418. "auth_enabled": True,
  419. "admin_username": "groupsadmin",
  420. "admin_password": "AdminPass1!",
  421. },
  422. )
  423. login_response = await async_client.post(
  424. "/api/v1/auth/login",
  425. json={"username": "groupsadmin", "password": "AdminPass1!"},
  426. )
  427. return login_response.json()["access_token"]
  428. @pytest.mark.asyncio
  429. @pytest.mark.integration
  430. async def test_list_groups(self, async_client: AsyncClient, auth_token: str):
  431. """Verify listing groups returns default groups."""
  432. response = await async_client.get(
  433. "/api/v1/groups/",
  434. headers={"Authorization": f"Bearer {auth_token}"},
  435. )
  436. assert response.status_code == 200
  437. groups = response.json()
  438. assert isinstance(groups, list)
  439. # Should have default groups: Administrators, Operators, Viewers
  440. group_names = [g["name"] for g in groups]
  441. assert "Administrators" in group_names
  442. assert "Operators" in group_names
  443. assert "Viewers" in group_names
  444. @pytest.mark.asyncio
  445. @pytest.mark.integration
  446. async def test_get_permissions(self, async_client: AsyncClient, auth_token: str):
  447. """Verify getting available permissions."""
  448. response = await async_client.get(
  449. "/api/v1/groups/permissions",
  450. headers={"Authorization": f"Bearer {auth_token}"},
  451. )
  452. assert response.status_code == 200
  453. permissions = response.json()
  454. assert isinstance(permissions, dict)
  455. # Should have permission categories
  456. assert "Printers" in permissions or len(permissions) > 0
  457. @pytest.mark.asyncio
  458. @pytest.mark.integration
  459. async def test_create_group(self, async_client: AsyncClient, auth_token: str):
  460. """Verify creating a new group."""
  461. response = await async_client.post(
  462. "/api/v1/groups/",
  463. headers={"Authorization": f"Bearer {auth_token}"},
  464. json={
  465. "name": "Custom Group",
  466. "description": "A custom test group",
  467. "permissions": ["printers:read", "archives:read"],
  468. },
  469. )
  470. assert response.status_code == 201
  471. group = response.json()
  472. assert group["name"] == "Custom Group"
  473. assert group["description"] == "A custom test group"
  474. assert "printers:read" in group["permissions"]
  475. assert group["is_system"] is False
  476. @pytest.mark.asyncio
  477. @pytest.mark.integration
  478. async def test_update_group(self, async_client: AsyncClient, auth_token: str):
  479. """Verify updating a group."""
  480. # Create a group first
  481. create_response = await async_client.post(
  482. "/api/v1/groups/",
  483. headers={"Authorization": f"Bearer {auth_token}"},
  484. json={
  485. "name": "Update Test Group",
  486. "permissions": ["printers:read"],
  487. },
  488. )
  489. group_id = create_response.json()["id"]
  490. # Update the group
  491. response = await async_client.patch(
  492. f"/api/v1/groups/{group_id}",
  493. headers={"Authorization": f"Bearer {auth_token}"},
  494. json={
  495. "description": "Updated description",
  496. "permissions": ["printers:read", "printers:control"],
  497. },
  498. )
  499. assert response.status_code == 200
  500. group = response.json()
  501. assert group["description"] == "Updated description"
  502. assert "printers:control" in group["permissions"]
  503. @pytest.mark.asyncio
  504. @pytest.mark.integration
  505. async def test_cannot_delete_system_group(self, async_client: AsyncClient, auth_token: str):
  506. """Verify system groups cannot be deleted."""
  507. # Get the Administrators group
  508. list_response = await async_client.get(
  509. "/api/v1/groups/",
  510. headers={"Authorization": f"Bearer {auth_token}"},
  511. )
  512. admin_group = next(g for g in list_response.json() if g["name"] == "Administrators")
  513. # Try to delete it
  514. response = await async_client.delete(
  515. f"/api/v1/groups/{admin_group['id']}",
  516. headers={"Authorization": f"Bearer {auth_token}"},
  517. )
  518. assert response.status_code == 400
  519. assert "system group" in response.json()["detail"].lower()
  520. @pytest.mark.asyncio
  521. @pytest.mark.integration
  522. async def test_delete_custom_group(self, async_client: AsyncClient, auth_token: str):
  523. """Verify custom groups can be deleted."""
  524. # Create a group
  525. create_response = await async_client.post(
  526. "/api/v1/groups/",
  527. headers={"Authorization": f"Bearer {auth_token}"},
  528. json={"name": "Delete Test Group"},
  529. )
  530. group_id = create_response.json()["id"]
  531. # Delete it
  532. response = await async_client.delete(
  533. f"/api/v1/groups/{group_id}",
  534. headers={"Authorization": f"Bearer {auth_token}"},
  535. )
  536. assert response.status_code == 204
  537. class TestUserGroupsAPI:
  538. """Integration tests for user-group assignments."""
  539. @pytest.fixture
  540. async def auth_token(self, async_client: AsyncClient):
  541. """Setup auth and return admin token."""
  542. await async_client.post(
  543. "/api/v1/auth/setup",
  544. json={
  545. "auth_enabled": True,
  546. "admin_username": "usergroupadmin",
  547. "admin_password": "AdminPass1!",
  548. },
  549. )
  550. login_response = await async_client.post(
  551. "/api/v1/auth/login",
  552. json={"username": "usergroupadmin", "password": "AdminPass1!"},
  553. )
  554. return login_response.json()["access_token"]
  555. @pytest.mark.asyncio
  556. @pytest.mark.integration
  557. async def test_create_user_with_groups(self, async_client: AsyncClient, auth_token: str):
  558. """Verify creating a user with group assignments."""
  559. # Get Operators group ID
  560. groups_response = await async_client.get(
  561. "/api/v1/groups/",
  562. headers={"Authorization": f"Bearer {auth_token}"},
  563. )
  564. operators_group = next(g for g in groups_response.json() if g["name"] == "Operators")
  565. # Create user with group
  566. response = await async_client.post(
  567. "/api/v1/users/",
  568. headers={"Authorization": f"Bearer {auth_token}"},
  569. json={
  570. "username": "groupuser",
  571. "password": "Password123!",
  572. "group_ids": [operators_group["id"]],
  573. },
  574. )
  575. assert response.status_code == 201
  576. user = response.json()
  577. assert any(g["name"] == "Operators" for g in user["groups"])
  578. @pytest.mark.asyncio
  579. @pytest.mark.integration
  580. async def test_add_user_to_group(self, async_client: AsyncClient, auth_token: str):
  581. """Verify adding a user to a group."""
  582. # Create a user
  583. user_response = await async_client.post(
  584. "/api/v1/users/",
  585. headers={"Authorization": f"Bearer {auth_token}"},
  586. json={"username": "addtogroup", "password": "Password123!"},
  587. )
  588. user_id = user_response.json()["id"]
  589. # Get Viewers group
  590. groups_response = await async_client.get(
  591. "/api/v1/groups/",
  592. headers={"Authorization": f"Bearer {auth_token}"},
  593. )
  594. viewers_group = next(g for g in groups_response.json() if g["name"] == "Viewers")
  595. # Add user to group
  596. response = await async_client.post(
  597. f"/api/v1/groups/{viewers_group['id']}/users/{user_id}",
  598. headers={"Authorization": f"Bearer {auth_token}"},
  599. )
  600. assert response.status_code == 204
  601. # Verify user is in group
  602. user_check = await async_client.get(
  603. f"/api/v1/users/{user_id}",
  604. headers={"Authorization": f"Bearer {auth_token}"},
  605. )
  606. assert any(g["name"] == "Viewers" for g in user_check.json()["groups"])
  607. class TestChangePasswordAPI:
  608. """Integration tests for /api/v1/users/me/change-password endpoint."""
  609. @pytest.fixture
  610. async def user_token(self, async_client: AsyncClient):
  611. """Setup auth and return regular user token."""
  612. # Enable auth with admin
  613. await async_client.post(
  614. "/api/v1/auth/setup",
  615. json={
  616. "auth_enabled": True,
  617. "admin_username": "pwchangeadmin",
  618. "admin_password": "AdminPass1!",
  619. },
  620. )
  621. admin_login = await async_client.post(
  622. "/api/v1/auth/login",
  623. json={"username": "pwchangeadmin", "password": "AdminPass1!"},
  624. )
  625. admin_token = admin_login.json()["access_token"]
  626. # Create a regular user
  627. await async_client.post(
  628. "/api/v1/users/",
  629. headers={"Authorization": f"Bearer {admin_token}"},
  630. json={"username": "pwchangeuser", "password": "Oldpassword123!"},
  631. )
  632. # Login as regular user
  633. user_login = await async_client.post(
  634. "/api/v1/auth/login",
  635. json={"username": "pwchangeuser", "password": "Oldpassword123!"},
  636. )
  637. return user_login.json()["access_token"]
  638. @pytest.mark.asyncio
  639. @pytest.mark.integration
  640. async def test_change_password_success(self, async_client: AsyncClient, user_token: str):
  641. """Verify user can change their own password."""
  642. response = await async_client.post(
  643. "/api/v1/users/me/change-password",
  644. headers={"Authorization": f"Bearer {user_token}"},
  645. json={
  646. "current_password": "Oldpassword123!",
  647. "new_password": "Newpassword456!",
  648. },
  649. )
  650. assert response.status_code == 200
  651. assert "success" in response.json()["message"].lower()
  652. # Verify can login with new password
  653. login_response = await async_client.post(
  654. "/api/v1/auth/login",
  655. json={"username": "pwchangeuser", "password": "Newpassword456!"},
  656. )
  657. assert login_response.status_code == 200
  658. @pytest.mark.asyncio
  659. @pytest.mark.integration
  660. async def test_change_password_wrong_current(self, async_client: AsyncClient, user_token: str):
  661. """Verify changing password fails with wrong current password."""
  662. response = await async_client.post(
  663. "/api/v1/users/me/change-password",
  664. headers={"Authorization": f"Bearer {user_token}"},
  665. json={
  666. "current_password": "wrongpassword",
  667. "new_password": "Newpassword456!",
  668. },
  669. )
  670. assert response.status_code == 400
  671. assert "incorrect" in response.json()["detail"].lower()
  672. @pytest.mark.asyncio
  673. @pytest.mark.integration
  674. async def test_change_password_requires_auth(self, async_client: AsyncClient):
  675. """Verify changing password requires authentication."""
  676. response = await async_client.post(
  677. "/api/v1/users/me/change-password",
  678. json={
  679. "current_password": "oldpassword",
  680. "new_password": "Strongpass456!",
  681. },
  682. )
  683. assert response.status_code == 401
  684. class TestAuthMiddlewarePublicRoutes:
  685. """Tests for auth middleware public route configuration.
  686. These routes must be accessible without authentication, even when auth is enabled,
  687. because browser elements like <img src> and <video src> don't send Authorization headers.
  688. """
  689. @pytest.fixture
  690. async def enabled_auth(self, async_client: AsyncClient):
  691. """Enable auth for testing middleware behavior."""
  692. await async_client.post(
  693. "/api/v1/auth/setup",
  694. json={
  695. "auth_enabled": True,
  696. "admin_username": "middlewareadmin",
  697. "admin_password": "AdminPass1!",
  698. },
  699. )
  700. @pytest.mark.asyncio
  701. @pytest.mark.integration
  702. async def test_auth_status_is_public(self, async_client: AsyncClient, enabled_auth):
  703. """Verify /api/v1/auth/status is accessible without auth."""
  704. response = await async_client.get("/api/v1/auth/status")
  705. assert response.status_code == 200
  706. assert "auth_enabled" in response.json()
  707. @pytest.mark.asyncio
  708. @pytest.mark.integration
  709. async def test_auth_login_is_public(self, async_client: AsyncClient, enabled_auth):
  710. """Verify /api/v1/auth/login is accessible without auth."""
  711. response = await async_client.post(
  712. "/api/v1/auth/login",
  713. json={"username": "middlewareadmin", "password": "AdminPass1!"},
  714. )
  715. # Should not return 401 (unauthorized) - it should either succeed or return
  716. # a different error (like 400 for wrong credentials)
  717. assert response.status_code != 401 or "token" in response.json()
  718. @pytest.mark.asyncio
  719. @pytest.mark.integration
  720. async def test_auth_setup_is_public(self, async_client: AsyncClient):
  721. """Verify /api/v1/auth/setup is accessible without auth (needed for setup/recovery)."""
  722. # Don't enable auth first - test that setup endpoint itself is accessible
  723. response = await async_client.post(
  724. "/api/v1/auth/setup",
  725. json={"auth_enabled": False},
  726. )
  727. # Should not be 401
  728. assert response.status_code != 401
  729. @pytest.mark.asyncio
  730. @pytest.mark.integration
  731. async def test_updates_version_is_public(self, async_client: AsyncClient, enabled_auth):
  732. """Verify /api/v1/updates/version is accessible without auth."""
  733. response = await async_client.get("/api/v1/updates/version")
  734. # Should not be 401
  735. assert response.status_code != 401
  736. @pytest.mark.asyncio
  737. @pytest.mark.integration
  738. async def test_protected_route_requires_auth(self, async_client: AsyncClient, enabled_auth):
  739. """Verify non-public routes return 401 without token."""
  740. response = await async_client.get("/api/v1/printers/")
  741. assert response.status_code == 401
  742. @pytest.mark.asyncio
  743. @pytest.mark.integration
  744. async def test_protected_route_works_with_token(self, async_client: AsyncClient, enabled_auth):
  745. """Verify non-public routes work with valid token."""
  746. # Login to get token
  747. login_response = await async_client.post(
  748. "/api/v1/auth/login",
  749. json={"username": "middlewareadmin", "password": "AdminPass1!"},
  750. )
  751. token = login_response.json()["access_token"]
  752. # Access protected route
  753. response = await async_client.get(
  754. "/api/v1/printers/",
  755. headers={"Authorization": f"Bearer {token}"},
  756. )
  757. assert response.status_code == 200
  758. @pytest.mark.asyncio
  759. @pytest.mark.integration
  760. async def test_advanced_auth_status_is_public(self, async_client: AsyncClient, enabled_auth):
  761. """Verify /api/v1/auth/advanced-auth/status is accessible without auth."""
  762. response = await async_client.get("/api/v1/auth/advanced-auth/status")
  763. # Should not be 401 (must be accessible for login page)
  764. assert response.status_code != 401
  765. # Should return valid response (200 with auth status)
  766. if response.status_code == 200:
  767. result = response.json()
  768. assert "advanced_auth_enabled" in result
  769. assert "smtp_configured" in result
  770. @pytest.mark.asyncio
  771. @pytest.mark.integration
  772. async def test_forgot_password_is_public(self, async_client: AsyncClient, enabled_auth):
  773. """Verify /api/v1/auth/forgot-password is accessible without auth."""
  774. response = await async_client.post(
  775. "/api/v1/auth/forgot-password",
  776. json={"email": "test@example.com"},
  777. )
  778. # Should not be 401 (must be accessible for password reset from login page)
  779. assert response.status_code != 401
  780. # Will likely be 400 (advanced auth not enabled) but that's okay -
  781. # the important thing is it's not blocked by auth middleware
  782. assert response.status_code in [200, 400]
  783. # ===========================================================================
  784. # H-1: Input length validation
  785. # ===========================================================================
  786. class TestInputLengthValidation:
  787. """LoginRequest and SetupRequest must reject oversized inputs (H-1)."""
  788. @pytest.mark.asyncio
  789. @pytest.mark.integration
  790. async def test_login_password_too_long_rejected(self, async_client: AsyncClient):
  791. """Password exceeding 256 characters must be rejected with 422."""
  792. response = await async_client.post(
  793. "/api/v1/auth/login",
  794. json={"username": "admin", "password": "x" * 257},
  795. )
  796. assert response.status_code == 422
  797. @pytest.mark.asyncio
  798. @pytest.mark.integration
  799. async def test_login_username_too_long_rejected(self, async_client: AsyncClient):
  800. """Username exceeding 150 characters must be rejected with 422."""
  801. response = await async_client.post(
  802. "/api/v1/auth/login",
  803. json={"username": "u" * 151, "password": "password"},
  804. )
  805. assert response.status_code == 422
  806. @pytest.mark.asyncio
  807. @pytest.mark.integration
  808. async def test_setup_password_too_long_rejected(self, async_client: AsyncClient):
  809. """SetupRequest admin_password exceeding 256 characters must be rejected with 422."""
  810. response = await async_client.post(
  811. "/api/v1/auth/setup",
  812. json={
  813. "auth_enabled": True,
  814. "admin_username": "admin",
  815. "admin_password": "x" * 257,
  816. },
  817. )
  818. assert response.status_code == 422
  819. @pytest.mark.asyncio
  820. @pytest.mark.integration
  821. async def test_login_password_at_limit_accepted(self, async_client: AsyncClient):
  822. """Password of exactly 256 characters must pass schema validation (may fail auth)."""
  823. response = await async_client.post(
  824. "/api/v1/auth/login",
  825. json={"username": "admin", "password": "x" * 256},
  826. )
  827. # Schema accepts it; auth may reject with 401 (auth disabled) or 400
  828. assert response.status_code != 422