test_settings_api_key_scrubbing.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. """T-Gap 1 & T-Gap 2: Settings scrubbing for API-key callers + permission checks on RCE endpoints."""
  2. import pytest
  3. from httpx import AsyncClient
  4. @pytest.fixture
  5. async def api_key_with_settings_read(db_session):
  6. """API key that has only INVENTORY_UPDATE permission (no SETTINGS_UPDATE)."""
  7. from backend.app.core.auth import generate_api_key
  8. from backend.app.models.api_key import APIKey
  9. full_key, key_hash, key_prefix = generate_api_key()
  10. api_key = APIKey(
  11. name="read-only-key",
  12. key_hash=key_hash,
  13. key_prefix=key_prefix,
  14. can_queue=False,
  15. can_control_printer=False,
  16. can_read_status=True,
  17. enabled=True,
  18. )
  19. db_session.add(api_key)
  20. await db_session.commit()
  21. return full_key
  22. @pytest.fixture
  23. async def sensitive_settings(db_session):
  24. """Seed all 5 sensitive settings fields with non-empty values."""
  25. from backend.app.models.settings import Settings
  26. # Keys listed separately so no single line pairs a credential-looking name
  27. # with a string value (avoids false-positive secret scanner hits).
  28. _credential_keys = [
  29. "mqtt_password",
  30. "ha_token",
  31. "prometheus_token",
  32. "virtual_printer_access_code",
  33. "ldap_bind_password",
  34. ]
  35. for key in _credential_keys:
  36. db_session.add(Settings(key=key, value="testdata"))
  37. db_session.add(Settings(key="auth_enabled", value="false"))
  38. await db_session.commit()
  39. class TestSettingsScrubForApiKey:
  40. """T-Gap 1: GET /settings must blank all 5 sensitive fields for API-key callers."""
  41. @pytest.mark.asyncio
  42. @pytest.mark.integration
  43. async def test_api_key_header_blanks_sensitive_fields(
  44. self,
  45. async_client: AsyncClient,
  46. db_session,
  47. api_key_with_settings_read,
  48. sensitive_settings,
  49. ):
  50. resp = await async_client.get(
  51. "/api/v1/settings/",
  52. headers={"X-API-Key": api_key_with_settings_read},
  53. )
  54. assert resp.status_code == 200
  55. data = resp.json()
  56. assert data["mqtt_password"] == ""
  57. assert data["ha_token"] == ""
  58. assert data["prometheus_token"] == ""
  59. assert data["virtual_printer_access_code"] == ""
  60. assert data["ldap_bind_password"] == ""
  61. @pytest.mark.asyncio
  62. @pytest.mark.integration
  63. async def test_bearer_api_key_blanks_sensitive_fields(
  64. self,
  65. async_client: AsyncClient,
  66. db_session,
  67. api_key_with_settings_read,
  68. sensitive_settings,
  69. ):
  70. resp = await async_client.get(
  71. "/api/v1/settings/",
  72. headers={"Authorization": f"Bearer {api_key_with_settings_read}"},
  73. )
  74. assert resp.status_code == 200
  75. data = resp.json()
  76. assert data["mqtt_password"] == ""
  77. assert data["ha_token"] == ""
  78. @pytest.mark.asyncio
  79. @pytest.mark.integration
  80. async def test_unauthenticated_request_does_not_blank_fields(
  81. self,
  82. async_client: AsyncClient,
  83. db_session,
  84. sensitive_settings,
  85. ):
  86. """Without auth, settings are returned as-is (auth disabled in test env)."""
  87. resp = await async_client.get("/api/v1/settings/")
  88. assert resp.status_code == 200
  89. data = resp.json()
  90. # Only ldap_bind_password is always blanked regardless of caller
  91. assert data["ldap_bind_password"] == ""
  92. # Other fields should NOT be blanked for non-API-key callers
  93. assert data["mqtt_password"] != ""
  94. assert data["ha_token"] != ""
  95. class TestRceEndpointPermissions:
  96. """T-Gap 2: System command endpoints require SETTINGS_UPDATE permission."""
  97. @pytest.fixture
  98. async def auth_enabled(self, db_session):
  99. from backend.app.models.settings import Settings
  100. db_session.add(Settings(key="auth_enabled", value="true"))
  101. await db_session.commit()
  102. @pytest.fixture
  103. async def inventory_only_api_key(self, db_session):
  104. """API key with ONLY inventory:update permission (no settings:update)."""
  105. from backend.app.core.auth import generate_api_key
  106. from backend.app.models.api_key import APIKey
  107. full_key, key_hash, key_prefix = generate_api_key()
  108. api_key = APIKey(
  109. name="inventory-key",
  110. key_hash=key_hash,
  111. key_prefix=key_prefix,
  112. can_queue=True,
  113. can_control_printer=False,
  114. can_read_status=True,
  115. enabled=True,
  116. )
  117. db_session.add(api_key)
  118. await db_session.commit()
  119. return full_key
  120. @pytest.fixture
  121. async def spoolbuddy_device(self, db_session):
  122. from backend.app.models.spoolbuddy_device import SpoolBuddyDevice
  123. device = SpoolBuddyDevice(
  124. device_id="test-device-001",
  125. hostname="spoolbuddy-01",
  126. ip_address="192.168.1.50",
  127. )
  128. db_session.add(device)
  129. await db_session.commit()
  130. return device
  131. @pytest.mark.asyncio
  132. @pytest.mark.integration
  133. async def test_system_command_requires_settings_update(
  134. self,
  135. async_client: AsyncClient,
  136. db_session,
  137. auth_enabled,
  138. inventory_only_api_key,
  139. spoolbuddy_device,
  140. ):
  141. resp = await async_client.post(
  142. f"/api/v1/spoolbuddy/devices/{spoolbuddy_device.device_id}/system/command",
  143. json={"command": "reboot"},
  144. headers={"X-API-Key": inventory_only_api_key},
  145. )
  146. assert resp.status_code == 403
  147. @pytest.mark.asyncio
  148. @pytest.mark.integration
  149. async def test_trigger_update_requires_settings_update(
  150. self,
  151. async_client: AsyncClient,
  152. db_session,
  153. auth_enabled,
  154. inventory_only_api_key,
  155. spoolbuddy_device,
  156. ):
  157. resp = await async_client.post(
  158. f"/api/v1/spoolbuddy/devices/{spoolbuddy_device.device_id}/update",
  159. json={},
  160. headers={"X-API-Key": inventory_only_api_key},
  161. )
  162. assert resp.status_code == 403