test_auth_apikey_rbac.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. """Integration tests for API key RBAC enforcement (security fix C1)."""
  2. import pytest
  3. from httpx import AsyncClient
  4. @pytest.fixture
  5. async def api_key_data(async_client: AsyncClient, db_session):
  6. """Create an API key and return its full key value."""
  7. from backend.app.core.auth import generate_api_key
  8. from backend.app.models.api_key import APIKey
  9. full_key, key_hash, key_prefix = generate_api_key()
  10. api_key = APIKey(
  11. name="test-key",
  12. key_hash=key_hash,
  13. key_prefix=key_prefix,
  14. can_queue=True,
  15. can_control_printer=True,
  16. can_read_status=True,
  17. enabled=True,
  18. )
  19. db_session.add(api_key)
  20. await db_session.commit()
  21. return full_key
  22. @pytest.fixture
  23. async def spoolman_settings(db_session):
  24. from backend.app.models.settings import Settings
  25. db_session.add(Settings(key="spoolman_enabled", value="true"))
  26. db_session.add(Settings(key="spoolman_url", value="http://localhost:7912"))
  27. await db_session.commit()
  28. class TestApiKeyRbacDenied:
  29. """API keys must be refused for admin-only endpoints."""
  30. @pytest.mark.asyncio
  31. @pytest.mark.integration
  32. async def test_api_key_cannot_access_settings_update_endpoint(
  33. self, async_client: AsyncClient, db_session, api_key_data
  34. ):
  35. """API key must not be usable for settings:update endpoints (C1)."""
  36. from backend.app.models.settings import Settings
  37. db_session.add(Settings(key="auth_enabled", value="true"))
  38. await db_session.commit()
  39. resp = await async_client.put(
  40. "/api/v1/settings/",
  41. json={},
  42. headers={"X-API-Key": api_key_data},
  43. )
  44. assert resp.status_code == 403
  45. assert "administrative operations" in resp.json()["detail"]
  46. @pytest.mark.asyncio
  47. @pytest.mark.integration
  48. async def test_api_key_bearer_cannot_access_settings_update(
  49. self, async_client: AsyncClient, db_session, api_key_data
  50. ):
  51. """Bearer bb_ API key must also be refused for settings:update (C1)."""
  52. from backend.app.models.settings import Settings
  53. db_session.add(Settings(key="auth_enabled", value="true"))
  54. await db_session.commit()
  55. resp = await async_client.put(
  56. "/api/v1/settings/",
  57. json={},
  58. headers={"Authorization": f"Bearer {api_key_data}"},
  59. )
  60. assert resp.status_code == 403
  61. assert "administrative operations" in resp.json()["detail"]
  62. class TestApiKeyRbacAllowed:
  63. """API keys must still work for non-admin endpoints."""
  64. @pytest.mark.asyncio
  65. @pytest.mark.integration
  66. async def test_api_key_can_access_inventory_read(
  67. self, async_client: AsyncClient, db_session, api_key_data, spoolman_settings
  68. ):
  69. """API key must be accepted for inventory:read endpoints (C1)."""
  70. from unittest.mock import AsyncMock, MagicMock, patch
  71. from backend.app.models.settings import Settings
  72. db_session.add(Settings(key="auth_enabled", value="true"))
  73. await db_session.commit()
  74. mock_client = MagicMock()
  75. mock_client.base_url = "http://localhost:7912"
  76. mock_client.health_check = AsyncMock(return_value=True)
  77. mock_client.get_all_spools = AsyncMock(return_value=[])
  78. with patch(
  79. "backend.app.api.routes.spoolman_inventory._get_client",
  80. AsyncMock(return_value=mock_client),
  81. ):
  82. resp = await async_client.get(
  83. "/api/v1/spoolman/inventory/spools",
  84. headers={"X-API-Key": api_key_data},
  85. )
  86. assert resp.status_code == 200
  87. class TestApiKeyDenylistIntegrity:
  88. """Drift-detection: assert that admin-tier permissions remain in the denylist."""
  89. def test_admin_permissions_are_denied_for_api_keys(self):
  90. """All known admin-tier permissions must be in _APIKEY_DENIED_PERMISSIONS (H1 guard)."""
  91. from backend.app.core.auth import _APIKEY_DENIED_PERMISSIONS
  92. from backend.app.core.permissions import Permission
  93. expected_denied = {
  94. # SETTINGS_READ is intentionally NOT denied — SpoolBuddy kiosk reads
  95. # settings via API key (e.g. to sync the UI language).
  96. Permission.SETTINGS_UPDATE,
  97. Permission.SETTINGS_BACKUP,
  98. Permission.SETTINGS_RESTORE,
  99. Permission.USERS_READ,
  100. Permission.USERS_CREATE,
  101. Permission.USERS_UPDATE,
  102. Permission.USERS_DELETE,
  103. Permission.GROUPS_READ,
  104. Permission.GROUPS_CREATE,
  105. Permission.GROUPS_UPDATE,
  106. Permission.GROUPS_DELETE,
  107. Permission.API_KEYS_READ,
  108. Permission.API_KEYS_CREATE,
  109. Permission.API_KEYS_UPDATE,
  110. Permission.API_KEYS_DELETE,
  111. Permission.GITHUB_BACKUP,
  112. Permission.GITHUB_RESTORE,
  113. Permission.FIRMWARE_UPDATE,
  114. }
  115. missing = expected_denied - _APIKEY_DENIED_PERMISSIONS
  116. assert not missing, (
  117. f"Admin-tier permissions not in API key denylist (add them to _APIKEY_DENIED_PERMISSIONS): {missing}"
  118. )
  119. def test_operational_permissions_are_allowed_for_api_keys(self):
  120. """Core operational permissions must NOT be in the denylist."""
  121. from backend.app.core.auth import _APIKEY_DENIED_PERMISSIONS
  122. from backend.app.core.permissions import Permission
  123. # NOTE: under the GHSA-r2qv-8222-hqg3 allowlist model, INVENTORY_CREATE
  124. # and INVENTORY_UPDATE are administrative (not in the allowlist) and
  125. # therefore denied for API keys regardless of denylist membership.
  126. # This test still guards the small denylist-redundancy set of read-y
  127. # permissions that the SpoolBuddy kiosk + status integrations rely on.
  128. expected_allowed = {
  129. Permission.INVENTORY_READ,
  130. Permission.PRINTERS_READ,
  131. Permission.PRINTERS_CONTROL,
  132. Permission.ARCHIVES_READ,
  133. # SpoolBuddy kiosk reads settings (e.g. language) via API key — must stay allowed.
  134. Permission.SETTINGS_READ,
  135. }
  136. incorrectly_denied = expected_allowed & _APIKEY_DENIED_PERMISSIONS
  137. assert not incorrectly_denied, f"Operational permissions incorrectly in API key denylist: {incorrectly_denied}"
  138. class TestApiKeyScopeAllowlist:
  139. """GHSA-r2qv-8222-hqg3 (CVSS 9.9) — allowlist-based scope enforcement.
  140. Verifies that ``_check_apikey_permissions`` (and the higher-level
  141. dependencies that call it) honour the per-permission scope mapping rather
  142. than the legacy denylist-only model. Failures here would re-open the
  143. "Read Status / Manage Queue / Control Printer / Manage Library checkboxes
  144. are decorative" class of bug.
  145. """
  146. def test_every_permission_has_a_classification(self):
  147. """Structural: every Permission must be either allowlisted or admin-denied.
  148. This is the load-bearing drift-detection test for the allowlist model.
  149. A new Permission added to ``core/permissions.py`` without a matching
  150. entry in ``_APIKEY_SCOPE_BY_PERMISSION`` or ``_APIKEY_DENIED_PERMISSIONS``
  151. is functionally admin-only (allowlist failure → 403) — that's the safe
  152. default, but it should be an explicit choice rather than an oversight.
  153. """
  154. from backend.app.core.auth import (
  155. _APIKEY_DENIED_PERMISSIONS,
  156. _APIKEY_SCOPE_BY_PERMISSION,
  157. )
  158. from backend.app.core.permissions import Permission
  159. unclassified = {
  160. perm
  161. for perm in Permission
  162. if perm not in _APIKEY_SCOPE_BY_PERMISSION and perm not in _APIKEY_DENIED_PERMISSIONS
  163. }
  164. assert not unclassified, (
  165. "Every Permission must be classified for API-key access. "
  166. "Either add to _APIKEY_SCOPE_BY_PERMISSION (with scope flag) or "
  167. f"_APIKEY_DENIED_PERMISSIONS (admin-only). Unclassified: {unclassified}"
  168. )
  169. def test_allowlist_uses_only_valid_scope_flags(self):
  170. """Every value in the scope mapping must be a real bool field on APIKey."""
  171. from backend.app.core.auth import _APIKEY_SCOPE_BY_PERMISSION
  172. from backend.app.models.api_key import APIKey
  173. # can_access_cloud / can_update_energy_cost are narrow opt-in scopes;
  174. # the latter routes through its own ``require_energy_cost_update`` dep
  175. # rather than the central allowlist, so it doesn't appear here.
  176. valid_flags = {
  177. "can_read_status",
  178. "can_queue",
  179. "can_control_printer",
  180. "can_manage_library",
  181. "can_manage_inventory",
  182. "can_access_cloud",
  183. }
  184. used_flags = set(_APIKEY_SCOPE_BY_PERMISSION.values())
  185. assert used_flags <= valid_flags, f"Unknown scope flags in mapping: {used_flags - valid_flags}"
  186. # And every flag must actually exist on the model.
  187. for flag in valid_flags:
  188. assert hasattr(APIKey, flag), f"APIKey model missing column referenced by allowlist: {flag}"
  189. def test_allowlist_and_denylist_are_disjoint(self):
  190. """A permission classified as allowlisted must not also be in the denylist (and v/v)."""
  191. from backend.app.core.auth import (
  192. _APIKEY_DENIED_PERMISSIONS,
  193. _APIKEY_SCOPE_BY_PERMISSION,
  194. )
  195. overlap = set(_APIKEY_SCOPE_BY_PERMISSION) & _APIKEY_DENIED_PERMISSIONS
  196. assert not overlap, f"Permissions in both allowlist and denylist: {overlap}"
  197. @pytest.mark.parametrize(
  198. "scope_flag",
  199. [
  200. "can_read_status",
  201. "can_queue",
  202. "can_control_printer",
  203. "can_manage_library",
  204. "can_manage_inventory",
  205. "can_access_cloud",
  206. ],
  207. )
  208. def test_each_scope_flag_has_at_least_one_permission(self, scope_flag):
  209. """If a scope flag has no permissions, it's dead code — fail loudly."""
  210. from backend.app.core.auth import _APIKEY_SCOPE_BY_PERMISSION
  211. assert scope_flag in _APIKEY_SCOPE_BY_PERMISSION.values(), (
  212. f"No permission maps to {scope_flag} — either remove the flag or classify a permission under it."
  213. )
  214. class _FakeApiKey:
  215. """Bool-attribute stand-in for APIKey used by the scope matrix tests.
  216. The ``_check_apikey_permissions`` function only inspects the four scope
  217. booleans, so a lightweight stub is enough; instantiating the real model
  218. requires a DB session which is overkill for pure-logic verification.
  219. """
  220. def __init__(
  221. self,
  222. can_read_status=False,
  223. can_queue=False,
  224. can_control_printer=False,
  225. can_manage_library=False,
  226. can_manage_inventory=False,
  227. ):
  228. self.can_read_status = can_read_status
  229. self.can_queue = can_queue
  230. self.can_control_printer = can_control_printer
  231. self.can_manage_library = can_manage_library
  232. self.can_manage_inventory = can_manage_inventory
  233. class TestCheckApiKeyPermissionsMatrix:
  234. """Pure-logic matrix: every (scope flag combo × representative permission) outcome.
  235. These are the tests that would have caught GHSA-r2qv-8222-hqg3 — they prove
  236. the actual gate function honours the scope flags, not just that some
  237. helper called by webhook.py does.
  238. """
  239. # (Permission, expected scope flag attribute, category description)
  240. _SCOPE_CASES = [
  241. # can_read_status
  242. ("PRINTERS_READ", "can_read_status", "read printer status"),
  243. ("ARCHIVES_READ", "can_read_status", "read archives"),
  244. ("QUEUE_READ", "can_read_status", "read queue"),
  245. ("SETTINGS_READ", "can_read_status", "SpoolBuddy kiosk settings read"),
  246. ("WEBSOCKET_CONNECT", "can_read_status", "websocket subscribe"),
  247. # can_queue
  248. ("QUEUE_CREATE", "can_queue", "add queue item"),
  249. ("QUEUE_DELETE_ALL", "can_queue", "delete any queue item"),
  250. ("ARCHIVES_REPRINT_ALL", "can_queue", "reprint an archive"),
  251. # can_control_printer
  252. ("PRINTERS_CONTROL", "can_control_printer", "start/stop print"),
  253. ("PRINTERS_FILES", "can_control_printer", "send file to printer"),
  254. ("SMART_PLUGS_CONTROL", "can_control_printer", "smart plug on/off"),
  255. # can_manage_library
  256. ("LIBRARY_UPLOAD", "can_manage_library", "upload library file"),
  257. ("LIBRARY_DELETE_OWN", "can_manage_library", "delete own library file"),
  258. ("MAKERWORLD_IMPORT", "can_manage_library", "import from MakerWorld"),
  259. # can_manage_inventory
  260. ("INVENTORY_CREATE", "can_manage_inventory", "create spool record"),
  261. ("INVENTORY_UPDATE", "can_manage_inventory", "update spool / SpoolBuddy kiosk write"),
  262. ("INVENTORY_DELETE", "can_manage_inventory", "delete spool record"),
  263. ("INVENTORY_FORECAST_WRITE", "can_manage_inventory", "update forecast SKU settings"),
  264. ]
  265. _ADMIN_CASES = [
  266. # Documented denylist
  267. "SETTINGS_UPDATE",
  268. "USERS_CREATE",
  269. "GROUPS_DELETE",
  270. "API_KEYS_CREATE",
  271. "GITHUB_BACKUP",
  272. "FIRMWARE_UPDATE",
  273. # Unmapped administrative (allowlist fail-closed catches these too)
  274. "PRINTERS_CREATE",
  275. "LIBRARY_DELETE_ALL",
  276. "LIBRARY_PURGE",
  277. "DISCOVERY_SCAN",
  278. ]
  279. @pytest.mark.parametrize("perm_name,required_flag,_descr", _SCOPE_CASES)
  280. def test_permission_allowed_only_when_scope_flag_is_set(self, perm_name, required_flag, _descr):
  281. """For each (Permission, scope) case, true→allow and false→403."""
  282. from fastapi import HTTPException
  283. from backend.app.core.auth import _check_apikey_permissions
  284. from backend.app.core.permissions import Permission
  285. perm = Permission[perm_name].value
  286. # Flag set → passes
  287. _check_apikey_permissions(_FakeApiKey(**{required_flag: True}), [perm])
  288. # All flags off → 403
  289. with pytest.raises(HTTPException) as exc:
  290. _check_apikey_permissions(_FakeApiKey(), [perm])
  291. assert exc.value.status_code == 403
  292. # Wrong flag set, required flag off → 403 (no cross-scope leakage)
  293. other_flags = {
  294. f
  295. for f in ("can_read_status", "can_queue", "can_control_printer", "can_manage_library")
  296. if f != required_flag
  297. }
  298. for other in other_flags:
  299. with pytest.raises(HTTPException) as exc:
  300. _check_apikey_permissions(_FakeApiKey(**{other: True}), [perm])
  301. assert exc.value.status_code == 403
  302. @pytest.mark.parametrize("perm_name", _ADMIN_CASES)
  303. def test_admin_permissions_are_403_regardless_of_flags(self, perm_name):
  304. """A fully-flagged API key still cannot use administrative permissions."""
  305. from fastapi import HTTPException
  306. from backend.app.core.auth import _check_apikey_permissions
  307. from backend.app.core.permissions import Permission
  308. perm = Permission[perm_name].value
  309. all_flags = _FakeApiKey(can_read_status=True, can_queue=True, can_control_printer=True, can_manage_library=True)
  310. with pytest.raises(HTTPException) as exc:
  311. _check_apikey_permissions(all_flags, [perm])
  312. assert exc.value.status_code == 403
  313. assert "administrative" in exc.value.detail.lower() or "does not have" in exc.value.detail.lower()
  314. def test_unknown_permission_string_is_admin_denied(self):
  315. """An unrecognised permission string must fail closed, not silently pass."""
  316. from fastapi import HTTPException
  317. from backend.app.core.auth import _check_apikey_permissions
  318. all_flags = _FakeApiKey(can_read_status=True, can_queue=True, can_control_printer=True, can_manage_library=True)
  319. with pytest.raises(HTTPException) as exc:
  320. _check_apikey_permissions(all_flags, ["bogus:nonexistent"])
  321. assert exc.value.status_code == 403
  322. def test_empty_perm_list_is_403(self):
  323. """Defence-in-depth: an empty perm list must not silently allow."""
  324. from fastapi import HTTPException
  325. from backend.app.core.auth import _check_apikey_permissions
  326. all_flags = _FakeApiKey(can_read_status=True, can_queue=True, can_control_printer=True, can_manage_library=True)
  327. with pytest.raises(HTTPException) as exc:
  328. _check_apikey_permissions(all_flags, [])
  329. assert exc.value.status_code == 403
  330. def test_require_any_at_least_one_must_pass(self):
  331. """``require_any=True`` matches any-of semantics, but still respects scopes."""
  332. from fastapi import HTTPException
  333. from backend.app.core.auth import _check_apikey_permissions
  334. from backend.app.core.permissions import Permission
  335. # can_read_status only: any-of (PRINTERS_READ, QUEUE_CREATE) passes because the read flag is set.
  336. _check_apikey_permissions(
  337. _FakeApiKey(can_read_status=True),
  338. [Permission.PRINTERS_READ.value, Permission.QUEUE_CREATE.value],
  339. require_any=True,
  340. )
  341. # No flags: any-of fails.
  342. with pytest.raises(HTTPException):
  343. _check_apikey_permissions(
  344. _FakeApiKey(),
  345. [Permission.PRINTERS_READ.value, Permission.QUEUE_CREATE.value],
  346. require_any=True,
  347. )
  348. # All admin perms: any-of fails even with every flag set.
  349. with pytest.raises(HTTPException):
  350. _check_apikey_permissions(
  351. _FakeApiKey(can_read_status=True, can_queue=True, can_control_printer=True, can_manage_library=True),
  352. [Permission.USERS_CREATE.value, Permission.GROUPS_DELETE.value],
  353. require_any=True,
  354. )
  355. def test_require_all_every_perm_must_pass(self):
  356. """Default ``require_any=False``: every permission must pass — single failure → 403."""
  357. from fastapi import HTTPException
  358. from backend.app.core.auth import _check_apikey_permissions
  359. from backend.app.core.permissions import Permission
  360. # Read+queue set, queue+control required → fails because control flag is off.
  361. with pytest.raises(HTTPException) as exc:
  362. _check_apikey_permissions(
  363. _FakeApiKey(can_read_status=True, can_queue=True),
  364. [Permission.QUEUE_CREATE.value, Permission.PRINTERS_CONTROL.value],
  365. )
  366. assert exc.value.status_code == 403