test_auth_api.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993
  1. """Integration tests for Authentication API endpoints.
  2. Tests the full request/response cycle for /api/v1/auth/ and /api/v1/users/ endpoints.
  3. """
  4. import pytest
  5. from httpx import AsyncClient
  6. class TestAuthStatusAPI:
  7. """Integration tests for /api/v1/auth/status endpoint."""
  8. @pytest.mark.asyncio
  9. @pytest.mark.integration
  10. async def test_get_auth_status_disabled(self, async_client: AsyncClient):
  11. """Verify auth status returns disabled when not configured."""
  12. response = await async_client.get("/api/v1/auth/status")
  13. assert response.status_code == 200
  14. result = response.json()
  15. assert "auth_enabled" in result
  16. assert result["auth_enabled"] is False
  17. assert result["requires_setup"] is True
  18. class TestAuthSetupAPI:
  19. """Integration tests for /api/v1/auth/setup endpoint."""
  20. @pytest.mark.asyncio
  21. @pytest.mark.integration
  22. async def test_setup_auth_disabled(self, async_client: AsyncClient):
  23. """Verify auth can be set up with auth disabled (no password required)."""
  24. response = await async_client.post(
  25. "/api/v1/auth/setup",
  26. json={"auth_enabled": False},
  27. )
  28. assert response.status_code == 200
  29. result = response.json()
  30. assert result["auth_enabled"] is False
  31. assert result["admin_created"] is False
  32. @pytest.mark.asyncio
  33. @pytest.mark.integration
  34. async def test_setup_auth_enabled_requires_credentials(self, async_client: AsyncClient):
  35. """Verify enabling auth requires admin username and password."""
  36. response = await async_client.post(
  37. "/api/v1/auth/setup",
  38. json={"auth_enabled": True},
  39. )
  40. assert response.status_code == 400
  41. assert "Admin username and password are required" in response.json()["detail"]
  42. @pytest.mark.asyncio
  43. @pytest.mark.integration
  44. async def test_setup_auth_enabled_with_credentials(self, async_client: AsyncClient):
  45. """Verify auth can be enabled with admin credentials."""
  46. response = await async_client.post(
  47. "/api/v1/auth/setup",
  48. json={
  49. "auth_enabled": True,
  50. "admin_username": "testadmin",
  51. "admin_password": "TestPass1!",
  52. },
  53. )
  54. assert response.status_code == 200
  55. result = response.json()
  56. assert result["auth_enabled"] is True
  57. assert result["admin_created"] is True
  58. @pytest.mark.asyncio
  59. @pytest.mark.integration
  60. async def test_setup_weak_password_rejected_when_creating_new_admin(self, async_client: AsyncClient):
  61. """Complexity is enforced only when a new admin is being created."""
  62. response = await async_client.post(
  63. "/api/v1/auth/setup",
  64. json={
  65. "auth_enabled": True,
  66. "admin_username": "weakpw_admin",
  67. "admin_password": "NoSpecial1",
  68. },
  69. )
  70. assert response.status_code == 400
  71. assert "special character" in response.json()["detail"].lower()
  72. @pytest.mark.asyncio
  73. @pytest.mark.integration
  74. async def test_setup_reenable_with_existing_admin_ignores_password(self, async_client: AsyncClient, db_session):
  75. """Re-enabling auth when an admin already exists must not reject the placeholder
  76. password the frontend still sends. Regression for the LDAP re-enable flow that
  77. previously 422'd because the Pydantic schema enforced complexity unconditionally.
  78. """
  79. from backend.app.core.auth import get_password_hash
  80. from backend.app.models.user import User
  81. existing = User(
  82. username="existing_admin",
  83. # pragma: allowlist secret — test fixture only, not a real credential
  84. password_hash=get_password_hash("DoesNotMatter1!"), # noqa: S106
  85. role="admin",
  86. is_active=True,
  87. )
  88. db_session.add(existing)
  89. await db_session.commit()
  90. response = await async_client.post(
  91. "/api/v1/auth/setup",
  92. json={
  93. "auth_enabled": True,
  94. "admin_username": "irrelevant",
  95. "admin_password": "NoSpecial1",
  96. },
  97. )
  98. assert response.status_code == 200
  99. result = response.json()
  100. assert result["auth_enabled"] is True
  101. assert result["admin_created"] is False
  102. class TestAuthLoginAPI:
  103. """Integration tests for /api/v1/auth/login endpoint."""
  104. @pytest.mark.asyncio
  105. @pytest.mark.integration
  106. async def test_login_auth_disabled(self, async_client: AsyncClient):
  107. """Verify login fails when auth is not enabled."""
  108. response = await async_client.post(
  109. "/api/v1/auth/login",
  110. json={"username": "admin", "password": "password"},
  111. )
  112. assert response.status_code == 400
  113. assert "Authentication is not enabled" in response.json()["detail"]
  114. @pytest.mark.asyncio
  115. @pytest.mark.integration
  116. async def test_login_success(self, async_client: AsyncClient):
  117. """Verify login succeeds with valid credentials after setup."""
  118. # First enable auth
  119. await async_client.post(
  120. "/api/v1/auth/setup",
  121. json={
  122. "auth_enabled": True,
  123. "admin_username": "logintest",
  124. "admin_password": "LoginPass1!",
  125. },
  126. )
  127. # Now login
  128. response = await async_client.post(
  129. "/api/v1/auth/login",
  130. json={"username": "logintest", "password": "LoginPass1!"},
  131. )
  132. assert response.status_code == 200
  133. result = response.json()
  134. assert "access_token" in result
  135. assert result["token_type"] == "bearer"
  136. assert result["user"]["username"] == "logintest"
  137. assert result["user"]["role"] == "admin"
  138. @pytest.mark.asyncio
  139. @pytest.mark.integration
  140. async def test_login_invalid_credentials(self, async_client: AsyncClient):
  141. """Verify login fails with invalid credentials."""
  142. # First enable auth
  143. await async_client.post(
  144. "/api/v1/auth/setup",
  145. json={
  146. "auth_enabled": True,
  147. "admin_username": "invalidtest",
  148. "admin_password": "CorrectPass1!",
  149. },
  150. )
  151. # Try login with wrong password
  152. response = await async_client.post(
  153. "/api/v1/auth/login",
  154. json={"username": "invalidtest", "password": "wrongpassword"},
  155. )
  156. assert response.status_code == 401
  157. assert "Incorrect username or password" in response.json()["detail"]
  158. class TestAuthMeAPI:
  159. """Integration tests for /api/v1/auth/me endpoint."""
  160. @pytest.mark.asyncio
  161. @pytest.mark.integration
  162. async def test_me_without_token(self, async_client: AsyncClient):
  163. """Verify /me fails without authentication token."""
  164. response = await async_client.get("/api/v1/auth/me")
  165. assert response.status_code == 401
  166. @pytest.mark.asyncio
  167. @pytest.mark.integration
  168. async def test_me_with_valid_token(self, async_client: AsyncClient):
  169. """Verify /me returns user info with valid token."""
  170. # Setup and login
  171. await async_client.post(
  172. "/api/v1/auth/setup",
  173. json={
  174. "auth_enabled": True,
  175. "admin_username": "metest",
  176. "admin_password": "MePass1!",
  177. },
  178. )
  179. login_response = await async_client.post(
  180. "/api/v1/auth/login",
  181. json={"username": "metest", "password": "MePass1!"},
  182. )
  183. token = login_response.json()["access_token"]
  184. # Get current user
  185. response = await async_client.get(
  186. "/api/v1/auth/me",
  187. headers={"Authorization": f"Bearer {token}"},
  188. )
  189. assert response.status_code == 200
  190. result = response.json()
  191. assert result["username"] == "metest"
  192. assert result["role"] == "admin"
  193. assert result["is_active"] is True
  194. @pytest.mark.asyncio
  195. @pytest.mark.integration
  196. async def test_me_with_api_key_bearer(self, async_client: AsyncClient, db_session):
  197. """Verify /me returns synthetic admin user when using API key via Bearer token."""
  198. from backend.app.core.auth import generate_api_key
  199. from backend.app.models.api_key import APIKey
  200. # Create an API key directly in the database
  201. full_key, key_hash, key_prefix = generate_api_key()
  202. api_key = APIKey(name="test-kiosk", key_hash=key_hash, key_prefix=key_prefix, enabled=True)
  203. db_session.add(api_key)
  204. await db_session.commit()
  205. # Call /me with the API key as Bearer token
  206. response = await async_client.get(
  207. "/api/v1/auth/me",
  208. headers={"Authorization": f"Bearer {full_key}"},
  209. )
  210. assert response.status_code == 200
  211. result = response.json()
  212. assert result["id"] == 0
  213. assert result["username"].startswith("api-key:")
  214. assert result["role"] == "admin"
  215. assert result["is_admin"] is True
  216. assert result["is_active"] is True
  217. assert len(result["permissions"]) > 0
  218. @pytest.mark.asyncio
  219. @pytest.mark.integration
  220. async def test_me_with_api_key_header(self, async_client: AsyncClient, db_session):
  221. """Verify /me returns synthetic admin user when using X-API-Key header."""
  222. from backend.app.core.auth import generate_api_key
  223. from backend.app.models.api_key import APIKey
  224. full_key, key_hash, key_prefix = generate_api_key()
  225. api_key = APIKey(name="test-kiosk-header", key_hash=key_hash, key_prefix=key_prefix, enabled=True)
  226. db_session.add(api_key)
  227. await db_session.commit()
  228. response = await async_client.get(
  229. "/api/v1/auth/me",
  230. headers={"X-API-Key": full_key},
  231. )
  232. assert response.status_code == 200
  233. result = response.json()
  234. assert result["id"] == 0
  235. assert result["username"].startswith("api-key:")
  236. assert result["is_admin"] is True
  237. @pytest.mark.asyncio
  238. @pytest.mark.integration
  239. async def test_me_with_invalid_api_key(self, async_client: AsyncClient):
  240. """Verify /me rejects invalid API key."""
  241. response = await async_client.get(
  242. "/api/v1/auth/me",
  243. headers={"Authorization": "Bearer bb_invalid_key_value"},
  244. )
  245. assert response.status_code == 401
  246. class TestUsersAPI:
  247. """Integration tests for /api/v1/users/ endpoints."""
  248. @pytest.fixture
  249. async def auth_token(self, async_client: AsyncClient):
  250. """Setup auth and return admin token."""
  251. await async_client.post(
  252. "/api/v1/auth/setup",
  253. json={
  254. "auth_enabled": True,
  255. "admin_username": "usersadmin",
  256. "admin_password": "AdminPass1!",
  257. },
  258. )
  259. login_response = await async_client.post(
  260. "/api/v1/auth/login",
  261. json={"username": "usersadmin", "password": "AdminPass1!"},
  262. )
  263. return login_response.json()["access_token"]
  264. @pytest.mark.asyncio
  265. @pytest.mark.integration
  266. async def test_list_users_requires_auth(self, async_client: AsyncClient):
  267. """Verify listing users requires authentication when auth is enabled."""
  268. # First enable auth
  269. await async_client.post(
  270. "/api/v1/auth/setup",
  271. json={
  272. "auth_enabled": True,
  273. "admin_username": "authreqadmin",
  274. "admin_password": "AdminPass1!",
  275. },
  276. )
  277. # Now try to list users without a token
  278. response = await async_client.get("/api/v1/users/")
  279. assert response.status_code == 401
  280. @pytest.mark.asyncio
  281. @pytest.mark.integration
  282. async def test_list_users_as_admin(self, async_client: AsyncClient, auth_token: str):
  283. """Verify admin can list users."""
  284. response = await async_client.get(
  285. "/api/v1/users/",
  286. headers={"Authorization": f"Bearer {auth_token}"},
  287. )
  288. assert response.status_code == 200
  289. result = response.json()
  290. assert isinstance(result, list)
  291. assert len(result) >= 1 # At least the admin user
  292. @pytest.mark.asyncio
  293. @pytest.mark.integration
  294. async def test_create_user(self, async_client: AsyncClient, auth_token: str):
  295. """Verify admin can create a new user."""
  296. response = await async_client.post(
  297. "/api/v1/users/",
  298. headers={"Authorization": f"Bearer {auth_token}"},
  299. json={
  300. "username": "newuser",
  301. "password": "Newuserpass1!",
  302. "role": "user",
  303. },
  304. )
  305. assert response.status_code == 201
  306. result = response.json()
  307. assert result["username"] == "newuser"
  308. assert result["role"] == "user"
  309. assert result["is_active"] is True
  310. @pytest.mark.asyncio
  311. @pytest.mark.integration
  312. async def test_create_user_duplicate_username(self, async_client: AsyncClient, auth_token: str):
  313. """Verify creating user with duplicate username fails."""
  314. # Create first user
  315. await async_client.post(
  316. "/api/v1/users/",
  317. headers={"Authorization": f"Bearer {auth_token}"},
  318. json={
  319. "username": "duplicateuser",
  320. "password": "Password123!",
  321. "role": "user",
  322. },
  323. )
  324. # Try to create duplicate
  325. response = await async_client.post(
  326. "/api/v1/users/",
  327. headers={"Authorization": f"Bearer {auth_token}"},
  328. json={
  329. "username": "duplicateuser",
  330. "password": "Password456!",
  331. "role": "user",
  332. },
  333. )
  334. assert response.status_code == 400
  335. assert "Username already exists" in response.json()["detail"]
  336. @pytest.mark.asyncio
  337. @pytest.mark.integration
  338. async def test_update_user(self, async_client: AsyncClient, auth_token: str):
  339. """Verify admin can update a user."""
  340. # Create user
  341. create_response = await async_client.post(
  342. "/api/v1/users/",
  343. headers={"Authorization": f"Bearer {auth_token}"},
  344. json={
  345. "username": "updateuser",
  346. "password": "Password123!",
  347. "role": "user",
  348. },
  349. )
  350. user_id = create_response.json()["id"]
  351. # Update user
  352. response = await async_client.patch(
  353. f"/api/v1/users/{user_id}",
  354. headers={"Authorization": f"Bearer {auth_token}"},
  355. json={"role": "admin"},
  356. )
  357. assert response.status_code == 200
  358. assert response.json()["role"] == "admin"
  359. @pytest.mark.asyncio
  360. @pytest.mark.integration
  361. async def test_delete_user(self, async_client: AsyncClient, auth_token: str, db_session):
  362. """Verify admin can delete a user and that all auth-table side effects cascade.
  363. The auth-cleanup side effects matter on SQLite (FK enforcement off by default):
  364. without explicit DELETEs in the endpoint, deleting a user leaves orphan rows
  365. in user_oidc_links / user_totp / user_otp_codes / api_keys — which would
  366. block SSO re-login and leak MFA secrets (#1285).
  367. """
  368. from sqlalchemy import select
  369. from backend.app.models.api_key import APIKey
  370. from backend.app.models.long_lived_token import LongLivedToken
  371. from backend.app.models.oidc_provider import UserOIDCLink
  372. from backend.app.models.user import User
  373. from backend.app.models.user_otp_code import UserOTPCode
  374. from backend.app.models.user_totp import UserTOTP
  375. # Create user
  376. create_response = await async_client.post(
  377. "/api/v1/users/",
  378. headers={"Authorization": f"Bearer {auth_token}"},
  379. json={
  380. "username": "deleteuser",
  381. "password": "Password123!",
  382. "role": "user",
  383. },
  384. )
  385. user_id = create_response.json()["id"]
  386. # Delete user
  387. response = await async_client.delete(
  388. f"/api/v1/users/{user_id}",
  389. headers={"Authorization": f"Bearer {auth_token}"},
  390. )
  391. assert response.status_code == 204
  392. # All auth-related rows for this user must be gone — see #1285.
  393. await db_session.commit()
  394. user_row = await db_session.execute(select(User).where(User.id == user_id))
  395. assert user_row.scalar_one_or_none() is None, "User row not deleted"
  396. for model in (UserOIDCLink, UserTOTP, UserOTPCode, APIKey, LongLivedToken):
  397. rows = await db_session.execute(select(model).where(model.user_id == user_id))
  398. assert rows.scalars().all() == [], f"Orphan {model.__name__} rows left after user delete"
  399. class TestAuthDisableAPI:
  400. """Integration tests for /api/v1/auth/disable endpoint."""
  401. @pytest.mark.asyncio
  402. @pytest.mark.integration
  403. async def test_disable_auth(self, async_client: AsyncClient):
  404. """Verify admin can disable authentication."""
  405. # Setup auth
  406. await async_client.post(
  407. "/api/v1/auth/setup",
  408. json={
  409. "auth_enabled": True,
  410. "admin_username": "disableadmin",
  411. "admin_password": "AdminPass1!",
  412. },
  413. )
  414. # Login to get token
  415. login_response = await async_client.post(
  416. "/api/v1/auth/login",
  417. json={"username": "disableadmin", "password": "AdminPass1!"},
  418. )
  419. token = login_response.json()["access_token"]
  420. # Disable auth
  421. response = await async_client.post(
  422. "/api/v1/auth/disable",
  423. headers={"Authorization": f"Bearer {token}"},
  424. )
  425. assert response.status_code == 200
  426. assert response.json()["auth_enabled"] is False
  427. # Verify auth is now disabled
  428. status_response = await async_client.get("/api/v1/auth/status")
  429. assert status_response.json()["auth_enabled"] is False
  430. class TestGroupsAPI:
  431. """Integration tests for /api/v1/groups/ endpoints."""
  432. @pytest.fixture
  433. async def auth_token(self, async_client: AsyncClient):
  434. """Setup auth and return admin token."""
  435. await async_client.post(
  436. "/api/v1/auth/setup",
  437. json={
  438. "auth_enabled": True,
  439. "admin_username": "groupsadmin",
  440. "admin_password": "AdminPass1!",
  441. },
  442. )
  443. login_response = await async_client.post(
  444. "/api/v1/auth/login",
  445. json={"username": "groupsadmin", "password": "AdminPass1!"},
  446. )
  447. return login_response.json()["access_token"]
  448. @pytest.mark.asyncio
  449. @pytest.mark.integration
  450. async def test_list_groups(self, async_client: AsyncClient, auth_token: str):
  451. """Verify listing groups returns default groups."""
  452. response = await async_client.get(
  453. "/api/v1/groups/",
  454. headers={"Authorization": f"Bearer {auth_token}"},
  455. )
  456. assert response.status_code == 200
  457. groups = response.json()
  458. assert isinstance(groups, list)
  459. # Should have default groups: Administrators, Operators, Viewers
  460. group_names = [g["name"] for g in groups]
  461. assert "Administrators" in group_names
  462. assert "Operators" in group_names
  463. assert "Viewers" in group_names
  464. @pytest.mark.asyncio
  465. @pytest.mark.integration
  466. async def test_get_permissions(self, async_client: AsyncClient, auth_token: str):
  467. """Verify getting available permissions."""
  468. response = await async_client.get(
  469. "/api/v1/groups/permissions",
  470. headers={"Authorization": f"Bearer {auth_token}"},
  471. )
  472. assert response.status_code == 200
  473. permissions = response.json()
  474. assert isinstance(permissions, dict)
  475. # Should have permission categories
  476. assert "Printers" in permissions or len(permissions) > 0
  477. @pytest.mark.asyncio
  478. @pytest.mark.integration
  479. async def test_create_group(self, async_client: AsyncClient, auth_token: str):
  480. """Verify creating a new group."""
  481. response = await async_client.post(
  482. "/api/v1/groups/",
  483. headers={"Authorization": f"Bearer {auth_token}"},
  484. json={
  485. "name": "Custom Group",
  486. "description": "A custom test group",
  487. "permissions": ["printers:read", "archives:read"],
  488. },
  489. )
  490. assert response.status_code == 201
  491. group = response.json()
  492. assert group["name"] == "Custom Group"
  493. assert group["description"] == "A custom test group"
  494. assert "printers:read" in group["permissions"]
  495. assert group["is_system"] is False
  496. @pytest.mark.asyncio
  497. @pytest.mark.integration
  498. async def test_update_group(self, async_client: AsyncClient, auth_token: str):
  499. """Verify updating a group."""
  500. # Create a group first
  501. create_response = await async_client.post(
  502. "/api/v1/groups/",
  503. headers={"Authorization": f"Bearer {auth_token}"},
  504. json={
  505. "name": "Update Test Group",
  506. "permissions": ["printers:read"],
  507. },
  508. )
  509. group_id = create_response.json()["id"]
  510. # Update the group
  511. response = await async_client.patch(
  512. f"/api/v1/groups/{group_id}",
  513. headers={"Authorization": f"Bearer {auth_token}"},
  514. json={
  515. "description": "Updated description",
  516. "permissions": ["printers:read", "printers:control"],
  517. },
  518. )
  519. assert response.status_code == 200
  520. group = response.json()
  521. assert group["description"] == "Updated description"
  522. assert "printers:control" in group["permissions"]
  523. @pytest.mark.asyncio
  524. @pytest.mark.integration
  525. async def test_cannot_delete_system_group(self, async_client: AsyncClient, auth_token: str):
  526. """Verify system groups cannot be deleted."""
  527. # Get the Administrators group
  528. list_response = await async_client.get(
  529. "/api/v1/groups/",
  530. headers={"Authorization": f"Bearer {auth_token}"},
  531. )
  532. admin_group = next(g for g in list_response.json() if g["name"] == "Administrators")
  533. # Try to delete it
  534. response = await async_client.delete(
  535. f"/api/v1/groups/{admin_group['id']}",
  536. headers={"Authorization": f"Bearer {auth_token}"},
  537. )
  538. assert response.status_code == 400
  539. assert "system group" in response.json()["detail"].lower()
  540. @pytest.mark.asyncio
  541. @pytest.mark.integration
  542. async def test_delete_custom_group(self, async_client: AsyncClient, auth_token: str):
  543. """Verify custom groups can be deleted."""
  544. # Create a group
  545. create_response = await async_client.post(
  546. "/api/v1/groups/",
  547. headers={"Authorization": f"Bearer {auth_token}"},
  548. json={"name": "Delete Test Group"},
  549. )
  550. group_id = create_response.json()["id"]
  551. # Delete it
  552. response = await async_client.delete(
  553. f"/api/v1/groups/{group_id}",
  554. headers={"Authorization": f"Bearer {auth_token}"},
  555. )
  556. assert response.status_code == 204
  557. class TestUserGroupsAPI:
  558. """Integration tests for user-group assignments."""
  559. @pytest.fixture
  560. async def auth_token(self, async_client: AsyncClient):
  561. """Setup auth and return admin token."""
  562. await async_client.post(
  563. "/api/v1/auth/setup",
  564. json={
  565. "auth_enabled": True,
  566. "admin_username": "usergroupadmin",
  567. "admin_password": "AdminPass1!",
  568. },
  569. )
  570. login_response = await async_client.post(
  571. "/api/v1/auth/login",
  572. json={"username": "usergroupadmin", "password": "AdminPass1!"},
  573. )
  574. return login_response.json()["access_token"]
  575. @pytest.mark.asyncio
  576. @pytest.mark.integration
  577. async def test_create_user_with_groups(self, async_client: AsyncClient, auth_token: str):
  578. """Verify creating a user with group assignments."""
  579. # Get Operators group ID
  580. groups_response = await async_client.get(
  581. "/api/v1/groups/",
  582. headers={"Authorization": f"Bearer {auth_token}"},
  583. )
  584. operators_group = next(g for g in groups_response.json() if g["name"] == "Operators")
  585. # Create user with group
  586. response = await async_client.post(
  587. "/api/v1/users/",
  588. headers={"Authorization": f"Bearer {auth_token}"},
  589. json={
  590. "username": "groupuser",
  591. "password": "Password123!",
  592. "group_ids": [operators_group["id"]],
  593. },
  594. )
  595. assert response.status_code == 201
  596. user = response.json()
  597. assert any(g["name"] == "Operators" for g in user["groups"])
  598. @pytest.mark.asyncio
  599. @pytest.mark.integration
  600. async def test_add_user_to_group(self, async_client: AsyncClient, auth_token: str):
  601. """Verify adding a user to a group."""
  602. # Create a user
  603. user_response = await async_client.post(
  604. "/api/v1/users/",
  605. headers={"Authorization": f"Bearer {auth_token}"},
  606. json={"username": "addtogroup", "password": "Password123!"},
  607. )
  608. user_id = user_response.json()["id"]
  609. # Get Viewers group
  610. groups_response = await async_client.get(
  611. "/api/v1/groups/",
  612. headers={"Authorization": f"Bearer {auth_token}"},
  613. )
  614. viewers_group = next(g for g in groups_response.json() if g["name"] == "Viewers")
  615. # Add user to group
  616. response = await async_client.post(
  617. f"/api/v1/groups/{viewers_group['id']}/users/{user_id}",
  618. headers={"Authorization": f"Bearer {auth_token}"},
  619. )
  620. assert response.status_code == 204
  621. # Verify user is in group
  622. user_check = await async_client.get(
  623. f"/api/v1/users/{user_id}",
  624. headers={"Authorization": f"Bearer {auth_token}"},
  625. )
  626. assert any(g["name"] == "Viewers" for g in user_check.json()["groups"])
  627. class TestChangePasswordAPI:
  628. """Integration tests for /api/v1/users/me/change-password endpoint."""
  629. @pytest.fixture
  630. async def user_token(self, async_client: AsyncClient):
  631. """Setup auth and return regular user token."""
  632. # Enable auth with admin
  633. await async_client.post(
  634. "/api/v1/auth/setup",
  635. json={
  636. "auth_enabled": True,
  637. "admin_username": "pwchangeadmin",
  638. "admin_password": "AdminPass1!",
  639. },
  640. )
  641. admin_login = await async_client.post(
  642. "/api/v1/auth/login",
  643. json={"username": "pwchangeadmin", "password": "AdminPass1!"},
  644. )
  645. admin_token = admin_login.json()["access_token"]
  646. # Create a regular user
  647. await async_client.post(
  648. "/api/v1/users/",
  649. headers={"Authorization": f"Bearer {admin_token}"},
  650. json={"username": "pwchangeuser", "password": "Oldpassword123!"},
  651. )
  652. # Login as regular user
  653. user_login = await async_client.post(
  654. "/api/v1/auth/login",
  655. json={"username": "pwchangeuser", "password": "Oldpassword123!"},
  656. )
  657. return user_login.json()["access_token"]
  658. @pytest.mark.asyncio
  659. @pytest.mark.integration
  660. async def test_change_password_success(self, async_client: AsyncClient, user_token: str):
  661. """Verify user can change their own password."""
  662. response = await async_client.post(
  663. "/api/v1/users/me/change-password",
  664. headers={"Authorization": f"Bearer {user_token}"},
  665. json={
  666. "current_password": "Oldpassword123!",
  667. "new_password": "Newpassword456!",
  668. },
  669. )
  670. assert response.status_code == 200
  671. assert "success" in response.json()["message"].lower()
  672. # Verify can login with new password
  673. login_response = await async_client.post(
  674. "/api/v1/auth/login",
  675. json={"username": "pwchangeuser", "password": "Newpassword456!"},
  676. )
  677. assert login_response.status_code == 200
  678. @pytest.mark.asyncio
  679. @pytest.mark.integration
  680. async def test_change_password_wrong_current(self, async_client: AsyncClient, user_token: str):
  681. """Verify changing password fails with wrong current password."""
  682. response = await async_client.post(
  683. "/api/v1/users/me/change-password",
  684. headers={"Authorization": f"Bearer {user_token}"},
  685. json={
  686. "current_password": "wrongpassword",
  687. "new_password": "Newpassword456!",
  688. },
  689. )
  690. assert response.status_code == 400
  691. assert "incorrect" in response.json()["detail"].lower()
  692. @pytest.mark.asyncio
  693. @pytest.mark.integration
  694. async def test_change_password_requires_auth(self, async_client: AsyncClient):
  695. """Verify changing password requires authentication."""
  696. response = await async_client.post(
  697. "/api/v1/users/me/change-password",
  698. json={
  699. "current_password": "oldpassword",
  700. "new_password": "Strongpass456!",
  701. },
  702. )
  703. assert response.status_code == 401
  704. class TestAuthMiddlewarePublicRoutes:
  705. """Tests for auth middleware public route configuration.
  706. These routes must be accessible without authentication, even when auth is enabled,
  707. because browser elements like <img src> and <video src> don't send Authorization headers.
  708. """
  709. @pytest.fixture
  710. async def enabled_auth(self, async_client: AsyncClient):
  711. """Enable auth for testing middleware behavior."""
  712. await async_client.post(
  713. "/api/v1/auth/setup",
  714. json={
  715. "auth_enabled": True,
  716. "admin_username": "middlewareadmin",
  717. "admin_password": "AdminPass1!",
  718. },
  719. )
  720. @pytest.mark.asyncio
  721. @pytest.mark.integration
  722. async def test_auth_status_is_public(self, async_client: AsyncClient, enabled_auth):
  723. """Verify /api/v1/auth/status is accessible without auth."""
  724. response = await async_client.get("/api/v1/auth/status")
  725. assert response.status_code == 200
  726. assert "auth_enabled" in response.json()
  727. @pytest.mark.asyncio
  728. @pytest.mark.integration
  729. async def test_auth_login_is_public(self, async_client: AsyncClient, enabled_auth):
  730. """Verify /api/v1/auth/login is accessible without auth."""
  731. response = await async_client.post(
  732. "/api/v1/auth/login",
  733. json={"username": "middlewareadmin", "password": "AdminPass1!"},
  734. )
  735. # Should not return 401 (unauthorized) - it should either succeed or return
  736. # a different error (like 400 for wrong credentials)
  737. assert response.status_code != 401 or "token" in response.json()
  738. @pytest.mark.asyncio
  739. @pytest.mark.integration
  740. async def test_auth_setup_is_public(self, async_client: AsyncClient):
  741. """Verify /api/v1/auth/setup is accessible without auth (needed for setup/recovery)."""
  742. # Don't enable auth first - test that setup endpoint itself is accessible
  743. response = await async_client.post(
  744. "/api/v1/auth/setup",
  745. json={"auth_enabled": False},
  746. )
  747. # Should not be 401
  748. assert response.status_code != 401
  749. @pytest.mark.asyncio
  750. @pytest.mark.integration
  751. async def test_updates_version_is_public(self, async_client: AsyncClient, enabled_auth):
  752. """Verify /api/v1/updates/version is accessible without auth."""
  753. response = await async_client.get("/api/v1/updates/version")
  754. # Should not be 401
  755. assert response.status_code != 401
  756. @pytest.mark.asyncio
  757. @pytest.mark.integration
  758. async def test_protected_route_requires_auth(self, async_client: AsyncClient, enabled_auth):
  759. """Verify non-public routes return 401 without token."""
  760. response = await async_client.get("/api/v1/printers/")
  761. assert response.status_code == 401
  762. @pytest.mark.asyncio
  763. @pytest.mark.integration
  764. async def test_protected_route_works_with_token(self, async_client: AsyncClient, enabled_auth):
  765. """Verify non-public routes work with valid token."""
  766. # Login to get token
  767. login_response = await async_client.post(
  768. "/api/v1/auth/login",
  769. json={"username": "middlewareadmin", "password": "AdminPass1!"},
  770. )
  771. token = login_response.json()["access_token"]
  772. # Access protected route
  773. response = await async_client.get(
  774. "/api/v1/printers/",
  775. headers={"Authorization": f"Bearer {token}"},
  776. )
  777. assert response.status_code == 200
  778. @pytest.mark.asyncio
  779. @pytest.mark.integration
  780. async def test_advanced_auth_status_is_public(self, async_client: AsyncClient, enabled_auth):
  781. """Verify /api/v1/auth/advanced-auth/status is accessible without auth."""
  782. response = await async_client.get("/api/v1/auth/advanced-auth/status")
  783. # Should not be 401 (must be accessible for login page)
  784. assert response.status_code != 401
  785. # Should return valid response (200 with auth status)
  786. if response.status_code == 200:
  787. result = response.json()
  788. assert "advanced_auth_enabled" in result
  789. assert "smtp_configured" in result
  790. @pytest.mark.asyncio
  791. @pytest.mark.integration
  792. async def test_forgot_password_is_public(self, async_client: AsyncClient, enabled_auth):
  793. """Verify /api/v1/auth/forgot-password is accessible without auth."""
  794. response = await async_client.post(
  795. "/api/v1/auth/forgot-password",
  796. json={"email": "test@example.com"},
  797. )
  798. # Should not be 401 (must be accessible for password reset from login page)
  799. assert response.status_code != 401
  800. # Will likely be 400 (advanced auth not enabled) but that's okay -
  801. # the important thing is it's not blocked by auth middleware
  802. assert response.status_code in [200, 400]
  803. # ===========================================================================
  804. # H-1: Input length validation
  805. # ===========================================================================
  806. class TestInputLengthValidation:
  807. """LoginRequest and SetupRequest must reject oversized inputs (H-1)."""
  808. @pytest.mark.asyncio
  809. @pytest.mark.integration
  810. async def test_login_password_too_long_rejected(self, async_client: AsyncClient):
  811. """Password exceeding 256 characters must be rejected with 422."""
  812. response = await async_client.post(
  813. "/api/v1/auth/login",
  814. json={"username": "admin", "password": "x" * 257},
  815. )
  816. assert response.status_code == 422
  817. @pytest.mark.asyncio
  818. @pytest.mark.integration
  819. async def test_login_username_too_long_rejected(self, async_client: AsyncClient):
  820. """Username exceeding 150 characters must be rejected with 422."""
  821. response = await async_client.post(
  822. "/api/v1/auth/login",
  823. json={"username": "u" * 151, "password": "password"},
  824. )
  825. assert response.status_code == 422
  826. @pytest.mark.asyncio
  827. @pytest.mark.integration
  828. async def test_setup_password_too_long_rejected(self, async_client: AsyncClient):
  829. """SetupRequest admin_password exceeding 256 characters must be rejected with 422."""
  830. response = await async_client.post(
  831. "/api/v1/auth/setup",
  832. json={
  833. "auth_enabled": True,
  834. "admin_username": "admin",
  835. "admin_password": "x" * 257,
  836. },
  837. )
  838. assert response.status_code == 422
  839. @pytest.mark.asyncio
  840. @pytest.mark.integration
  841. async def test_login_password_at_limit_accepted(self, async_client: AsyncClient):
  842. """Password of exactly 256 characters must pass schema validation (may fail auth)."""
  843. response = await async_client.post(
  844. "/api/v1/auth/login",
  845. json={"username": "admin", "password": "x" * 256},
  846. )
  847. # Schema accepts it; auth may reject with 401 (auth disabled) or 400
  848. assert response.status_code != 422