test_api_key_cloud_access.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. """Integration tests for #1182 — API keys reading cloud presets on the owner's behalf.
  2. The contract these tests pin:
  3. Three independent fences must all pass for an API-keyed call to reach
  4. /cloud/* successfully:
  5. 1. The key has an owner (``user_id IS NOT NULL``) — legacy keys created
  6. before #1182 are forced to be recreated.
  7. 2. The key has ``can_access_cloud=True`` — opt-in scope so existing
  8. automation doesn't quietly start reading cloud data.
  9. 3. The owner has a stored ``cloud_token`` — the existing requirement,
  10. unchanged.
  11. Plus the model-level invariants: deleting the owner CASCADEs the key,
  12. and the route-level guards reject impossible config (cloud access without
  13. auth enabled, cloud access on an ownerless legacy key).
  14. """
  15. import pytest
  16. from httpx import AsyncClient
  17. from sqlalchemy import select
  18. from sqlalchemy.ext.asyncio import AsyncSession
  19. from backend.app.core.auth import generate_api_key
  20. from backend.app.models.api_key import APIKey
  21. from backend.app.models.user import User
  22. async def _setup_auth_with_admin(client: AsyncClient) -> str:
  23. """Enable auth + return an admin bearer token."""
  24. await client.post(
  25. "/api/v1/auth/setup",
  26. json={
  27. "auth_enabled": True,
  28. "admin_username": "cloudadmin",
  29. "admin_password": "AdminPass1!",
  30. },
  31. )
  32. login = await client.post(
  33. "/api/v1/auth/login",
  34. json={"username": "cloudadmin", "password": "AdminPass1!"},
  35. )
  36. return login.json()["access_token"]
  37. async def _store_admin_cloud_token(db: AsyncSession, username: str, token: str) -> User:
  38. """Stash a fake cloud_token on a User so /cloud/* has something to find.
  39. The actual token value never reaches Bambu Cloud in these tests — every
  40. test that hits a /cloud/* route mocks the upstream HTTP call. We only
  41. need the column populated for ``build_authenticated_cloud`` to return a
  42. service instead of None.
  43. """
  44. result = await db.execute(select(User).where(User.username == username))
  45. user = result.scalar_one()
  46. user.cloud_token = token
  47. user.cloud_email = "owner@example.com"
  48. user.cloud_region = "global"
  49. await db.commit()
  50. await db.refresh(user)
  51. return user
  52. class TestAPIKeyCreationFlags:
  53. """The new can_access_cloud flag is correctly stamped at create time and
  54. correctly rejected when the deployment can't satisfy it."""
  55. @pytest.mark.asyncio
  56. @pytest.mark.integration
  57. async def test_create_stamps_owner_and_cloud_flag(self, async_client: AsyncClient):
  58. token = await _setup_auth_with_admin(async_client)
  59. resp = await async_client.post(
  60. "/api/v1/api-keys/",
  61. headers={"Authorization": f"Bearer {token}"},
  62. json={"name": "automation", "can_access_cloud": True},
  63. )
  64. assert resp.status_code == 200
  65. body = resp.json()
  66. assert body["user_id"] is not None # owner stamped from creator
  67. assert body["can_access_cloud"] is True
  68. @pytest.mark.asyncio
  69. @pytest.mark.integration
  70. async def test_create_with_cloud_flag_rejected_when_auth_disabled(self, async_client: AsyncClient):
  71. """can_access_cloud needs per-user cloud_token storage, which only
  72. exists in auth-enabled deployments — fail loudly at create time
  73. rather than silently producing a non-functional key."""
  74. # No setup_auth call → auth is disabled
  75. resp = await async_client.post(
  76. "/api/v1/api-keys/",
  77. json={"name": "should-fail", "can_access_cloud": True},
  78. )
  79. assert resp.status_code == 400
  80. assert "auth" in resp.json()["detail"].lower()
  81. @pytest.mark.asyncio
  82. @pytest.mark.integration
  83. async def test_create_without_cloud_flag_defaults_off(self, async_client: AsyncClient):
  84. """Default is opt-out — existing automation that doesn't pass the
  85. flag must not silently gain cloud access on upgrade."""
  86. token = await _setup_auth_with_admin(async_client)
  87. resp = await async_client.post(
  88. "/api/v1/api-keys/",
  89. headers={"Authorization": f"Bearer {token}"},
  90. json={"name": "no-cloud"},
  91. )
  92. assert resp.status_code == 200
  93. assert resp.json()["can_access_cloud"] is False
  94. @pytest.mark.asyncio
  95. @pytest.mark.integration
  96. async def test_patch_cloud_flag_rejected_on_legacy_key(self, async_client: AsyncClient, db_session: AsyncSession):
  97. """A legacy key (user_id NULL) cannot be flipped to can_access_cloud=True
  98. because there's no owner whose cloud_token to read; force recreate."""
  99. token = await _setup_auth_with_admin(async_client)
  100. # Create a legacy key directly in the DB (user_id NULL, mimicking
  101. # a row that predates the migration).
  102. full_key, key_hash, key_prefix = generate_api_key()
  103. legacy = APIKey(
  104. name="legacy",
  105. key_hash=key_hash,
  106. key_prefix=key_prefix,
  107. user_id=None,
  108. )
  109. db_session.add(legacy)
  110. await db_session.commit()
  111. await db_session.refresh(legacy)
  112. resp = await async_client.patch(
  113. f"/api/v1/api-keys/{legacy.id}",
  114. headers={"Authorization": f"Bearer {token}"},
  115. json={"can_access_cloud": True},
  116. )
  117. assert resp.status_code == 400
  118. assert "recreate" in resp.json()["detail"].lower()
  119. class TestCloudRouteGating:
  120. """The /cloud/* router-level dependency rejects API keys that don't satisfy
  121. all three fences."""
  122. @pytest.mark.asyncio
  123. @pytest.mark.integration
  124. async def test_legacy_key_rejected_with_recreate_message(self, async_client: AsyncClient, db_session: AsyncSession):
  125. """Legacy ownerless key → /cloud/* responds 401 with explicit recreate copy."""
  126. await _setup_auth_with_admin(async_client)
  127. full_key, key_hash, key_prefix = generate_api_key()
  128. legacy = APIKey(
  129. name="legacy",
  130. key_hash=key_hash,
  131. key_prefix=key_prefix,
  132. user_id=None,
  133. can_access_cloud=False, # irrelevant — owner check fires first
  134. )
  135. db_session.add(legacy)
  136. await db_session.commit()
  137. resp = await async_client.get(
  138. "/api/v1/cloud/status",
  139. headers={"X-API-Key": full_key},
  140. )
  141. assert resp.status_code == 401
  142. assert "recreate" in resp.json()["detail"].lower()
  143. @pytest.mark.asyncio
  144. @pytest.mark.integration
  145. async def test_owned_key_without_cloud_flag_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  146. """Owner is set but can_access_cloud=False → 403 with 'enable cloud access'."""
  147. await _setup_auth_with_admin(async_client)
  148. # Look up the admin we just created so we can stamp ownership.
  149. result = await db_session.execute(select(User).where(User.username == "cloudadmin"))
  150. admin = result.scalar_one()
  151. full_key, key_hash, key_prefix = generate_api_key()
  152. owned = APIKey(
  153. name="no-cloud-scope",
  154. key_hash=key_hash,
  155. key_prefix=key_prefix,
  156. user_id=admin.id,
  157. can_access_cloud=False,
  158. )
  159. db_session.add(owned)
  160. await db_session.commit()
  161. resp = await async_client.get(
  162. "/api/v1/cloud/status",
  163. headers={"X-API-Key": full_key},
  164. )
  165. assert resp.status_code == 403, f"Expected 403, got {resp.status_code} with body {resp.json()}"
  166. assert "cloud" in resp.json()["detail"].lower()
  167. @pytest.mark.asyncio
  168. @pytest.mark.integration
  169. async def test_owned_key_with_cloud_flag_passes_gate(self, async_client: AsyncClient, db_session: AsyncSession):
  170. """Owner + can_access_cloud=True + owner has cloud_token → /cloud/status
  171. returns 200. Token verification with Bambu happens further downstream
  172. and is mocked — we only assert the gate let the request through."""
  173. await _setup_auth_with_admin(async_client)
  174. admin = await _store_admin_cloud_token(db_session, "cloudadmin", token="fake-bambu-token")
  175. full_key, key_hash, key_prefix = generate_api_key()
  176. owned = APIKey(
  177. name="cloud-reader",
  178. key_hash=key_hash,
  179. key_prefix=key_prefix,
  180. user_id=admin.id,
  181. can_access_cloud=True,
  182. )
  183. db_session.add(owned)
  184. await db_session.commit()
  185. # /cloud/status reads token presence from the user record — no upstream
  186. # HTTP call, so we can assert directly on the response shape.
  187. resp = await async_client.get(
  188. "/api/v1/cloud/status",
  189. headers={"X-API-Key": full_key},
  190. )
  191. assert resp.status_code == 200, f"Expected 200, got {resp.status_code} with body {resp.json()}"
  192. body = resp.json()
  193. # The gate let us through and the route resolved the owner's token —
  194. # status route reports token presence regardless of upstream availability.
  195. assert body.get("authenticated") is True or body.get("token_present") is True or "email" in body
  196. @pytest.mark.asyncio
  197. @pytest.mark.integration
  198. async def test_jwt_caller_unaffected_by_api_key_gate(self, async_client: AsyncClient, db_session: AsyncSession):
  199. """The router-level gate must be a no-op for JWT callers — they're
  200. already gated by Permission.CLOUD_AUTH on the user record."""
  201. admin_token = await _setup_auth_with_admin(async_client)
  202. await _store_admin_cloud_token(db_session, "cloudadmin", token="fake-bambu-token")
  203. resp = await async_client.get(
  204. "/api/v1/cloud/status",
  205. headers={"Authorization": f"Bearer {admin_token}"},
  206. )
  207. assert resp.status_code == 200
  208. class TestOwnerDeletionCleanup:
  209. """Deleting the owner User must drop their API keys — orphan keys that
  210. point at a vanished user are a security hazard. The model declares
  211. ON DELETE CASCADE (Postgres enforces it), but SQLite ships with FK
  212. enforcement off, so the user-delete route also runs an explicit
  213. ``DELETE FROM api_keys WHERE user_id = ?`` for cross-backend safety.
  214. This test pins the route's behaviour."""
  215. @pytest.mark.asyncio
  216. @pytest.mark.integration
  217. async def test_deleting_owner_removes_their_api_keys(self, async_client: AsyncClient, db_session: AsyncSession):
  218. # Set up: admin + a victim user + an API key owned by the victim.
  219. await _setup_auth_with_admin(async_client)
  220. admin_login = await async_client.post(
  221. "/api/v1/auth/login",
  222. json={"username": "cloudadmin", "password": "AdminPass1!"},
  223. )
  224. admin_token = admin_login.json()["access_token"]
  225. victim = User(
  226. username="cascade-victim",
  227. password_hash="x",
  228. role="user",
  229. is_active=True,
  230. )
  231. db_session.add(victim)
  232. await db_session.commit()
  233. await db_session.refresh(victim)
  234. _full_key, key_hash, key_prefix = generate_api_key()
  235. owned = APIKey(
  236. name="owned-by-victim",
  237. key_hash=key_hash,
  238. key_prefix=key_prefix,
  239. user_id=victim.id,
  240. )
  241. db_session.add(owned)
  242. await db_session.commit()
  243. key_id = owned.id
  244. victim_id = victim.id
  245. # Act: admin deletes the victim user via the API.
  246. del_resp = await async_client.delete(
  247. f"/api/v1/users/{victim_id}",
  248. headers={"Authorization": f"Bearer {admin_token}"},
  249. )
  250. assert del_resp.status_code in (200, 204), f"User delete failed: {del_resp.status_code} {del_resp.json()}"
  251. # Assert: the API key is gone. Refresh session state — the route
  252. # commits via its own session, so our session needs to re-read.
  253. db_session.expire_all()
  254. result = await db_session.execute(select(APIKey).where(APIKey.id == key_id))
  255. assert result.scalar_one_or_none() is None, "API key should have been removed when its owner was deleted"