test_cloud_auth.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. """Integration tests for per-user cloud credentials and cloud endpoint permissions.
  2. Regression tests for:
  3. - Per-user cloud token storage (when auth enabled)
  4. - Global fallback (when auth disabled)
  5. - Cloud endpoints use CLOUD_AUTH permission (not SETTINGS_READ)
  6. """
  7. from unittest.mock import AsyncMock, patch
  8. import pytest
  9. from httpx import AsyncClient
  10. class TestPerUserCloudCredentials:
  11. """Tests that cloud credentials are stored per-user when auth is enabled."""
  12. @pytest.fixture
  13. async def user_with_cloud_auth(self, db_session):
  14. """Create a user with CLOUD_AUTH permission via a group."""
  15. from backend.app.core.auth import get_password_hash
  16. from backend.app.models.group import Group
  17. from backend.app.models.user import User
  18. group = Group(
  19. name="CloudUsers",
  20. permissions=["cloud:auth", "filaments:read", "printers:read", "firmware:read"],
  21. )
  22. db_session.add(group)
  23. await db_session.flush()
  24. user = User(
  25. username="clouduser",
  26. password_hash=get_password_hash("testpass123"),
  27. role="user",
  28. )
  29. db_session.add(user)
  30. await db_session.flush()
  31. user.groups.append(group)
  32. await db_session.commit()
  33. await db_session.refresh(user)
  34. return user
  35. @pytest.fixture
  36. async def second_user_with_cloud_auth(self, db_session):
  37. """Create a second user with CLOUD_AUTH permission."""
  38. from sqlalchemy import select
  39. from backend.app.core.auth import get_password_hash
  40. from backend.app.models.group import Group
  41. from backend.app.models.user import User
  42. result = await db_session.execute(select(Group).where(Group.name == "CloudUsers"))
  43. group = result.scalar_one_or_none()
  44. if not group:
  45. group = Group(
  46. name="CloudUsers2",
  47. permissions=["cloud:auth", "filaments:read", "printers:read", "firmware:read"],
  48. )
  49. db_session.add(group)
  50. await db_session.flush()
  51. user = User(
  52. username="clouduser2",
  53. password_hash=get_password_hash("testpass456"),
  54. role="user",
  55. )
  56. db_session.add(user)
  57. await db_session.flush()
  58. user.groups.append(group)
  59. await db_session.commit()
  60. await db_session.refresh(user)
  61. return user
  62. @pytest.fixture
  63. async def cloud_auth_token(self, user_with_cloud_auth, async_client: AsyncClient):
  64. """Get auth token for user with cloud permissions."""
  65. response = await async_client.post(
  66. "/api/v1/auth/login",
  67. json={"username": "clouduser", "password": "testpass123"},
  68. )
  69. if response.status_code == 200:
  70. return response.json().get("access_token")
  71. return None
  72. @pytest.fixture
  73. async def second_auth_token(self, second_user_with_cloud_auth, async_client: AsyncClient):
  74. """Get auth token for second user."""
  75. response = await async_client.post(
  76. "/api/v1/auth/login",
  77. json={"username": "clouduser2", "password": "testpass456"},
  78. )
  79. if response.status_code == 200:
  80. return response.json().get("access_token")
  81. return None
  82. @pytest.mark.asyncio
  83. @pytest.mark.integration
  84. async def test_cloud_status_returns_not_authenticated_by_default(self, async_client: AsyncClient):
  85. """Cloud status should show not authenticated when no token is stored."""
  86. with patch("backend.app.core.auth.is_auth_enabled", return_value=False):
  87. response = await async_client.get("/api/v1/cloud/status")
  88. assert response.status_code == 200
  89. data = response.json()
  90. assert data["is_authenticated"] is False
  91. @pytest.mark.asyncio
  92. @pytest.mark.integration
  93. async def test_cloud_status_accessible_when_auth_disabled(self, async_client: AsyncClient):
  94. """Cloud endpoints should work when auth is disabled (global fallback)."""
  95. with patch("backend.app.core.auth.is_auth_enabled", return_value=False):
  96. response = await async_client.get("/api/v1/cloud/status")
  97. assert response.status_code == 200
  98. @pytest.mark.asyncio
  99. @pytest.mark.integration
  100. async def test_cloud_status_requires_auth_when_enabled(self, async_client: AsyncClient):
  101. """Cloud endpoints should require auth when auth is enabled."""
  102. with patch("backend.app.core.auth.is_auth_enabled", return_value=True):
  103. response = await async_client.get("/api/v1/cloud/status")
  104. assert response.status_code == 401
  105. class TestCloudEndpointPermissions:
  106. """Tests that cloud endpoints use CLOUD_AUTH permission, not SETTINGS_READ.
  107. Uses JWT tokens created directly (not via login endpoint) to avoid
  108. test infrastructure complexity with user creation across sessions.
  109. """
  110. @pytest.fixture
  111. async def settings_only_setup(self, async_client: AsyncClient):
  112. """Create user with settings:read but NOT cloud:auth, return JWT."""
  113. from backend.app.core.auth import create_access_token, get_password_hash
  114. from backend.app.core.database import async_session
  115. from backend.app.models.group import Group
  116. from backend.app.models.user import User
  117. async with async_session() as db:
  118. group = Group(name="SettingsReaders", permissions=["settings:read"])
  119. db.add(group)
  120. user = User(
  121. username="settingsuser",
  122. password_hash=get_password_hash("testpass123"),
  123. role="user",
  124. )
  125. db.add(user)
  126. await db.commit()
  127. await db.refresh(group)
  128. await db.refresh(user)
  129. from sqlalchemy import text
  130. await db.execute(
  131. text("INSERT INTO user_groups (user_id, group_id) VALUES (:uid, :gid)"),
  132. {"uid": user.id, "gid": group.id},
  133. )
  134. await db.commit()
  135. return create_access_token(data={"sub": "settingsuser"})
  136. @pytest.fixture
  137. async def cloud_only_setup(self, async_client: AsyncClient):
  138. """Create user with cloud:auth but NOT settings:read, return JWT."""
  139. from backend.app.core.auth import create_access_token, get_password_hash
  140. from backend.app.core.database import async_session
  141. from backend.app.models.group import Group
  142. from backend.app.models.user import User
  143. async with async_session() as db:
  144. group = Group(name="CloudOnly", permissions=["cloud:auth"])
  145. db.add(group)
  146. user = User(
  147. username="cloudonly",
  148. password_hash=get_password_hash("testpass123"),
  149. role="user",
  150. )
  151. db.add(user)
  152. await db.commit()
  153. await db.refresh(group)
  154. await db.refresh(user)
  155. from sqlalchemy import text
  156. await db.execute(
  157. text("INSERT INTO user_groups (user_id, group_id) VALUES (:uid, :gid)"),
  158. {"uid": user.id, "gid": group.id},
  159. )
  160. await db.commit()
  161. return create_access_token(data={"sub": "cloudonly"})
  162. @pytest.mark.asyncio
  163. @pytest.mark.integration
  164. async def test_cloud_settings_requires_cloud_auth_not_settings_read(
  165. self, async_client: AsyncClient, settings_only_setup, cloud_only_setup
  166. ):
  167. """GET /cloud/settings should require CLOUD_AUTH, not SETTINGS_READ.
  168. Regression test: previously used SETTINGS_READ which blocked users who
  169. had cloud:auth permission but not settings:read.
  170. """
  171. with patch("backend.app.core.auth.is_auth_enabled", return_value=True):
  172. # User with only settings:read should be denied
  173. response = await async_client.get(
  174. "/api/v1/cloud/settings",
  175. headers={"Authorization": f"Bearer {settings_only_setup}"},
  176. )
  177. assert response.status_code == 403
  178. # User with cloud:auth should be allowed (will get 401 since no cloud token,
  179. # but NOT 403 — permission check passes)
  180. response = await async_client.get(
  181. "/api/v1/cloud/settings",
  182. headers={"Authorization": f"Bearer {cloud_only_setup}"},
  183. )
  184. assert response.status_code == 401 # No cloud token, but permission OK
  185. @pytest.mark.asyncio
  186. @pytest.mark.integration
  187. async def test_cloud_status_requires_cloud_auth(
  188. self, async_client: AsyncClient, settings_only_setup, cloud_only_setup
  189. ):
  190. """GET /cloud/status should require CLOUD_AUTH."""
  191. with patch("backend.app.core.auth.is_auth_enabled", return_value=True):
  192. # settings:read only → 403
  193. response = await async_client.get(
  194. "/api/v1/cloud/status",
  195. headers={"Authorization": f"Bearer {settings_only_setup}"},
  196. )
  197. assert response.status_code == 403
  198. # cloud:auth → 200
  199. response = await async_client.get(
  200. "/api/v1/cloud/status",
  201. headers={"Authorization": f"Bearer {cloud_only_setup}"},
  202. )
  203. assert response.status_code == 200
  204. @pytest.mark.asyncio
  205. @pytest.mark.integration
  206. async def test_cloud_fields_requires_cloud_auth(
  207. self, async_client: AsyncClient, settings_only_setup, cloud_only_setup
  208. ):
  209. """GET /cloud/fields should require CLOUD_AUTH, not SETTINGS_READ."""
  210. with patch("backend.app.core.auth.is_auth_enabled", return_value=True):
  211. # settings:read only → 403
  212. response = await async_client.get(
  213. "/api/v1/cloud/fields",
  214. headers={"Authorization": f"Bearer {settings_only_setup}"},
  215. )
  216. assert response.status_code == 403
  217. # cloud:auth → 200
  218. response = await async_client.get(
  219. "/api/v1/cloud/fields",
  220. headers={"Authorization": f"Bearer {cloud_only_setup}"},
  221. )
  222. assert response.status_code == 200
  223. class TestCloudTokenStorage:
  224. """Unit-level tests for the token storage functions."""
  225. @pytest.mark.asyncio
  226. async def test_get_stored_token_returns_none_when_no_user_no_global(self, db_session):
  227. """get_stored_token with user=None and no global token returns (None, None)."""
  228. from backend.app.api.routes.cloud import get_stored_token
  229. token, email = await get_stored_token(db_session, user=None)
  230. assert token is None
  231. assert email is None
  232. @pytest.mark.asyncio
  233. async def test_store_and_get_global_token(self, db_session):
  234. """store_token with user=None stores in global Settings table."""
  235. from backend.app.api.routes.cloud import get_stored_token, store_token
  236. await store_token(db_session, "test-token-123", "test@example.com", user=None)
  237. token, email = await get_stored_token(db_session, user=None)
  238. assert token == "test-token-123"
  239. assert email == "test@example.com"
  240. @pytest.mark.asyncio
  241. async def test_store_and_get_per_user_token(self, db_session):
  242. """store_token with user stores on the user record."""
  243. from backend.app.api.routes.cloud import get_stored_token, store_token
  244. from backend.app.core.auth import get_password_hash
  245. from backend.app.models.user import User
  246. user = User(username="tokentest", password_hash=get_password_hash("pass"), role="user")
  247. db_session.add(user)
  248. await db_session.commit()
  249. await db_session.refresh(user)
  250. await store_token(db_session, "user-token-abc", "user@example.com", user=user)
  251. # Re-fetch user to verify persistence
  252. from sqlalchemy import select
  253. result = await db_session.execute(select(User).where(User.id == user.id))
  254. refreshed = result.scalar_one()
  255. assert refreshed.cloud_token == "user-token-abc"
  256. assert refreshed.cloud_email == "user@example.com"
  257. @pytest.mark.asyncio
  258. async def test_per_user_token_does_not_affect_global(self, db_session):
  259. """Storing per-user token should not affect global Settings."""
  260. from backend.app.api.routes.cloud import get_stored_token, store_token
  261. from backend.app.core.auth import get_password_hash
  262. from backend.app.models.user import User
  263. user = User(username="isolationtest", password_hash=get_password_hash("pass"), role="user")
  264. db_session.add(user)
  265. await db_session.commit()
  266. await db_session.refresh(user)
  267. # Store per-user token
  268. await store_token(db_session, "per-user-token", "per-user@test.com", user=user)
  269. # Global should still be empty
  270. global_token, global_email = await get_stored_token(db_session, user=None)
  271. assert global_token is None
  272. assert global_email is None
  273. @pytest.mark.asyncio
  274. async def test_clear_per_user_token(self, db_session):
  275. """clear_token with user clears only that user's credentials."""
  276. from backend.app.api.routes.cloud import clear_token, get_stored_token, store_token
  277. from backend.app.core.auth import get_password_hash
  278. from backend.app.models.user import User
  279. user = User(username="cleartest", password_hash=get_password_hash("pass"), role="user")
  280. db_session.add(user)
  281. await db_session.commit()
  282. await db_session.refresh(user)
  283. await store_token(db_session, "to-clear", "clear@test.com", user=user)
  284. await clear_token(db_session, user=user)
  285. from sqlalchemy import select
  286. result = await db_session.execute(select(User).where(User.id == user.id))
  287. refreshed = result.scalar_one()
  288. assert refreshed.cloud_token is None
  289. assert refreshed.cloud_email is None
  290. @pytest.mark.asyncio
  291. async def test_clear_global_token(self, db_session):
  292. """clear_token with user=None clears from global Settings."""
  293. from backend.app.api.routes.cloud import clear_token, get_stored_token, store_token
  294. await store_token(db_session, "global-token", "global@test.com", user=None)
  295. await clear_token(db_session, user=None)
  296. token, email = await get_stored_token(db_session, user=None)
  297. assert token is None
  298. assert email is None
  299. @pytest.mark.asyncio
  300. async def test_two_users_independent_tokens(self, db_session):
  301. """Two users should have completely independent cloud tokens."""
  302. from backend.app.api.routes.cloud import get_stored_token, store_token
  303. from backend.app.core.auth import get_password_hash
  304. from backend.app.models.user import User
  305. user_a = User(username="user_a", password_hash=get_password_hash("pass"), role="user")
  306. user_b = User(username="user_b", password_hash=get_password_hash("pass"), role="user")
  307. db_session.add_all([user_a, user_b])
  308. await db_session.commit()
  309. await db_session.refresh(user_a)
  310. await db_session.refresh(user_b)
  311. await store_token(db_session, "token-a", "a@test.com", user=user_a)
  312. await store_token(db_session, "token-b", "b@test.com", user=user_b)
  313. # Verify each user reads their own token (re-fetch from DB)
  314. from sqlalchemy import select
  315. result_a = await db_session.execute(select(User).where(User.id == user_a.id))
  316. result_b = await db_session.execute(select(User).where(User.id == user_b.id))
  317. fresh_a = result_a.scalar_one()
  318. fresh_b = result_b.scalar_one()
  319. token_a, email_a = await get_stored_token(db_session, user=fresh_a)
  320. token_b, email_b = await get_stored_token(db_session, user=fresh_b)
  321. assert token_a == "token-a"
  322. assert email_a == "a@test.com"
  323. assert token_b == "token-b"
  324. assert email_b == "b@test.com"