test_long_lived_tokens_api.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. """Integration tests for long-lived camera-stream token routes (#1108).
  2. Cover the auth gates, ownership rules, max-lifetime cap, token-shown-once
  3. contract, and the camera-stream auth fall-through (the existing 60-min
  4. ephemeral path still works AND a long-lived token is also accepted).
  5. """
  6. from __future__ import annotations
  7. import pytest
  8. from httpx import AsyncClient
  9. pytestmark = [pytest.mark.asyncio, pytest.mark.integration]
  10. # ---------------------------------------------------------------------------
  11. # Helpers
  12. # ---------------------------------------------------------------------------
  13. async def _setup_admin(async_client: AsyncClient, *, suffix: str = "") -> str:
  14. """Create the first admin and return their JWT."""
  15. await async_client.post(
  16. "/api/v1/auth/setup",
  17. json={
  18. "auth_enabled": True,
  19. "admin_username": f"tokenadmin{suffix}",
  20. "admin_password": "AdminPass1!",
  21. },
  22. )
  23. login = await async_client.post(
  24. "/api/v1/auth/login",
  25. json={"username": f"tokenadmin{suffix}", "password": "AdminPass1!"},
  26. )
  27. return login.json()["access_token"]
  28. async def _create_user(async_client: AsyncClient, admin_token: str, username: str, *, role: str = "user") -> int:
  29. """Create a non-admin user via the admin API and return their id.
  30. The user is assigned to the seeded "Viewers" group so they hold
  31. ``CAMERA_VIEW`` — without that, regular users cannot create their own
  32. long-lived tokens (which is the same gate the existing 60-min ephemeral
  33. flow uses).
  34. """
  35. # Fetch Viewers group id so the new user inherits CAMERA_VIEW.
  36. groups_resp = await async_client.get("/api/v1/groups/", headers={"Authorization": f"Bearer {admin_token}"})
  37. viewers = next((g for g in groups_resp.json() if g["name"] == "Viewers"), None)
  38. assert viewers is not None, f"Viewers group not seeded: {groups_resp.text}"
  39. response = await async_client.post(
  40. "/api/v1/users/",
  41. headers={"Authorization": f"Bearer {admin_token}"},
  42. json={
  43. "username": username,
  44. "password": "UserPass1!",
  45. "role": role,
  46. "group_ids": [viewers["id"]],
  47. },
  48. )
  49. assert response.status_code in (200, 201), response.text
  50. return response.json()["id"]
  51. async def _login(async_client: AsyncClient, username: str) -> str:
  52. response = await async_client.post(
  53. "/api/v1/auth/login",
  54. json={"username": username, "password": "UserPass1!"},
  55. )
  56. body = response.json()
  57. token = body.get("access_token")
  58. assert token, f"login for {username!r} returned no access_token: {body}"
  59. return token
  60. # ---------------------------------------------------------------------------
  61. # Create
  62. # ---------------------------------------------------------------------------
  63. class TestCreateLongLivedToken:
  64. async def test_create_returns_plaintext_token_exactly_once(self, async_client: AsyncClient):
  65. token = await _setup_admin(async_client, suffix="_create_once")
  66. response = await async_client.post(
  67. "/api/v1/auth/tokens",
  68. headers={"Authorization": f"Bearer {token}"},
  69. json={"name": "Home Assistant", "expires_in_days": 30},
  70. )
  71. assert response.status_code == 201, response.text
  72. body = response.json()
  73. assert body["token"].startswith("bblt_")
  74. assert body["name"] == "Home Assistant"
  75. assert body["scope"] == "camera_stream"
  76. assert body["lookup_prefix"]
  77. token_id = body["id"]
  78. # Listing must NOT include the plaintext (shown-once contract).
  79. listing = await async_client.get(
  80. "/api/v1/auth/tokens",
  81. headers={"Authorization": f"Bearer {token}"},
  82. )
  83. assert listing.status_code == 200
  84. listed = next((t for t in listing.json() if t["id"] == token_id), None)
  85. assert listed is not None
  86. assert listed["token"] is None # plaintext gone forever
  87. async def test_create_rejects_expires_in_zero(self, async_client: AsyncClient):
  88. """Issue #1108: ``expire_in: 0`` (never) is explicitly forbidden."""
  89. token = await _setup_admin(async_client, suffix="_zero_expire")
  90. response = await async_client.post(
  91. "/api/v1/auth/tokens",
  92. headers={"Authorization": f"Bearer {token}"},
  93. json={"name": "x", "expires_in_days": 0},
  94. )
  95. assert response.status_code == 400
  96. assert "positive" in response.json()["detail"].lower()
  97. async def test_create_rejects_above_max(self, async_client: AsyncClient):
  98. token = await _setup_admin(async_client, suffix="_above_max")
  99. response = await async_client.post(
  100. "/api/v1/auth/tokens",
  101. headers={"Authorization": f"Bearer {token}"},
  102. json={"name": "x", "expires_in_days": 366},
  103. )
  104. assert response.status_code == 400
  105. assert "365" in response.json()["detail"]
  106. async def test_create_requires_auth(self, async_client: AsyncClient):
  107. await _setup_admin(async_client, suffix="_unauth")
  108. response = await async_client.post(
  109. "/api/v1/auth/tokens",
  110. json={"name": "x", "expires_in_days": 7},
  111. )
  112. assert response.status_code == 401
  113. # ---------------------------------------------------------------------------
  114. # List
  115. # ---------------------------------------------------------------------------
  116. class TestListLongLivedTokens:
  117. async def test_list_returns_only_callers_tokens_by_default(self, async_client: AsyncClient):
  118. admin_token = await _setup_admin(async_client, suffix="_list_default")
  119. bob_id = await _create_user(async_client, admin_token, "bob_list")
  120. bob_token = await _login(async_client, "bob_list")
  121. # Each user creates one token.
  122. await async_client.post(
  123. "/api/v1/auth/tokens",
  124. headers={"Authorization": f"Bearer {admin_token}"},
  125. json={"name": "admins", "expires_in_days": 7},
  126. )
  127. await async_client.post(
  128. "/api/v1/auth/tokens",
  129. headers={"Authorization": f"Bearer {bob_token}"},
  130. json={"name": "bobs", "expires_in_days": 7},
  131. )
  132. # Bob's listing should see only his.
  133. bob_listing = await async_client.get(
  134. "/api/v1/auth/tokens",
  135. headers={"Authorization": f"Bearer {bob_token}"},
  136. )
  137. names = {t["name"] for t in bob_listing.json()}
  138. assert names == {"bobs"}
  139. assert bob_id == bob_listing.json()[0]["user_id"]
  140. async def test_admin_can_filter_by_user_id(self, async_client: AsyncClient):
  141. admin_token = await _setup_admin(async_client, suffix="_admin_filter")
  142. bob_id = await _create_user(async_client, admin_token, "bob_filter")
  143. bob_token = await _login(async_client, "bob_filter")
  144. await async_client.post(
  145. "/api/v1/auth/tokens",
  146. headers={"Authorization": f"Bearer {bob_token}"},
  147. json={"name": "bobs", "expires_in_days": 7},
  148. )
  149. admin_view = await async_client.get(
  150. f"/api/v1/auth/tokens?user_id={bob_id}",
  151. headers={"Authorization": f"Bearer {admin_token}"},
  152. )
  153. assert admin_view.status_code == 200
  154. names = {t["name"] for t in admin_view.json()}
  155. assert names == {"bobs"}
  156. async def test_non_admin_cannot_see_other_users_tokens(self, async_client: AsyncClient):
  157. admin_token = await _setup_admin(async_client, suffix="_non_admin")
  158. await _create_user(async_client, admin_token, "alice_see")
  159. bob_id = await _create_user(async_client, admin_token, "bob_see")
  160. alice_token = await _login(async_client, "alice_see")
  161. forbidden = await async_client.get(
  162. f"/api/v1/auth/tokens?user_id={bob_id}",
  163. headers={"Authorization": f"Bearer {alice_token}"},
  164. )
  165. assert forbidden.status_code == 403
  166. # ---------------------------------------------------------------------------
  167. # Revoke
  168. # ---------------------------------------------------------------------------
  169. class TestRevokeLongLivedToken:
  170. async def test_owner_can_revoke_own_token(self, async_client: AsyncClient):
  171. token = await _setup_admin(async_client, suffix="_revoke_own")
  172. created = await async_client.post(
  173. "/api/v1/auth/tokens",
  174. headers={"Authorization": f"Bearer {token}"},
  175. json={"name": "x", "expires_in_days": 7},
  176. )
  177. token_id = created.json()["id"]
  178. revoke = await async_client.delete(
  179. f"/api/v1/auth/tokens/{token_id}",
  180. headers={"Authorization": f"Bearer {token}"},
  181. )
  182. assert revoke.status_code == 204
  183. # Now gone from the listing.
  184. listing = await async_client.get("/api/v1/auth/tokens", headers={"Authorization": f"Bearer {token}"})
  185. assert all(t["id"] != token_id for t in listing.json())
  186. async def test_admin_can_revoke_any_users_token(self, async_client: AsyncClient):
  187. admin_token = await _setup_admin(async_client, suffix="_revoke_any")
  188. await _create_user(async_client, admin_token, "bob_revoke")
  189. bob_token = await _login(async_client, "bob_revoke")
  190. created = await async_client.post(
  191. "/api/v1/auth/tokens",
  192. headers={"Authorization": f"Bearer {bob_token}"},
  193. json={"name": "bobs", "expires_in_days": 7},
  194. )
  195. token_id = created.json()["id"]
  196. admin_revoke = await async_client.delete(
  197. f"/api/v1/auth/tokens/{token_id}",
  198. headers={"Authorization": f"Bearer {admin_token}"},
  199. )
  200. assert admin_revoke.status_code == 204
  201. async def test_non_owner_non_admin_cannot_revoke(self, async_client: AsyncClient):
  202. admin_token = await _setup_admin(async_client, suffix="_revoke_other")
  203. await _create_user(async_client, admin_token, "alice_attack")
  204. await _create_user(async_client, admin_token, "bob_target")
  205. bob_token = await _login(async_client, "bob_target")
  206. alice_token = await _login(async_client, "alice_attack")
  207. created = await async_client.post(
  208. "/api/v1/auth/tokens",
  209. headers={"Authorization": f"Bearer {bob_token}"},
  210. json={"name": "bobs", "expires_in_days": 7},
  211. )
  212. token_id = created.json()["id"]
  213. forbidden = await async_client.delete(
  214. f"/api/v1/auth/tokens/{token_id}",
  215. headers={"Authorization": f"Bearer {alice_token}"},
  216. )
  217. assert forbidden.status_code == 403
  218. async def test_revoke_unknown_id_404(self, async_client: AsyncClient):
  219. token = await _setup_admin(async_client, suffix="_revoke_unknown")
  220. response = await async_client.delete(
  221. "/api/v1/auth/tokens/99999",
  222. headers={"Authorization": f"Bearer {token}"},
  223. )
  224. assert response.status_code == 404
  225. # ---------------------------------------------------------------------------
  226. # Auth fall-through: ``verify_camera_stream_token`` accepts both kinds
  227. # ---------------------------------------------------------------------------
  228. # The full /camera/stream HTTP integration would need a real ffmpeg / printer
  229. # socket to keep the StreamingResponse alive. Verifying the auth dependency
  230. # directly is a stronger check anyway: the route's only auth job is to call
  231. # ``verify_camera_stream_token``, which is what these tests exercise.
  232. class TestCameraStreamTokenVerification:
  233. async def test_long_lived_token_verifies_via_camera_stream_path(self, async_client: AsyncClient):
  234. """A freshly minted long-lived token must pass the same dependency
  235. the camera-stream route uses, after the ephemeral path would have
  236. rejected it.
  237. """
  238. from backend.app.core.auth import verify_camera_stream_token
  239. token = await _setup_admin(async_client, suffix="_verify_long")
  240. created = await async_client.post(
  241. "/api/v1/auth/tokens",
  242. headers={"Authorization": f"Bearer {token}"},
  243. json={"name": "kiosk", "expires_in_days": 90},
  244. )
  245. long_lived = created.json()["token"]
  246. assert await verify_camera_stream_token(long_lived) is True
  247. async def test_revoked_long_lived_token_fails_camera_stream_check(self, async_client: AsyncClient):
  248. from backend.app.core.auth import verify_camera_stream_token
  249. token = await _setup_admin(async_client, suffix="_verify_revoke")
  250. created = await async_client.post(
  251. "/api/v1/auth/tokens",
  252. headers={"Authorization": f"Bearer {token}"},
  253. json={"name": "kiosk", "expires_in_days": 30},
  254. )
  255. long_lived = created.json()["token"]
  256. token_id = created.json()["id"]
  257. await async_client.delete(
  258. f"/api/v1/auth/tokens/{token_id}",
  259. headers={"Authorization": f"Bearer {token}"},
  260. )
  261. assert await verify_camera_stream_token(long_lived) is False
  262. async def test_garbage_token_fails_camera_stream_check(self, async_client: AsyncClient):
  263. from backend.app.core.auth import verify_camera_stream_token
  264. await _setup_admin(async_client, suffix="_verify_garbage")
  265. assert await verify_camera_stream_token("bblt_aaaaaaaa_garbage") is False
  266. assert await verify_camera_stream_token("not-a-real-token") is False