test_spoolman_service.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673
  1. """Unit tests for Spoolman service.
  2. These tests specifically target the sync_ams_tray method's disable_weight_sync
  3. functionality that controls whether remaining_weight is updated.
  4. Also includes tests for is_bambu_lab_spool RFID detection.
  5. """
  6. from unittest.mock import AsyncMock, Mock, patch
  7. import pytest
  8. from backend.app.services.spoolman import AMSTray, SpoolmanClient, init_spoolman_client
  9. class TestIsBambuLabSpool:
  10. """Tests for is_bambu_lab_spool — detects BL spools via RFID hardware identifiers only."""
  11. @pytest.fixture
  12. def client(self):
  13. return SpoolmanClient("http://localhost:7912")
  14. def test_valid_tray_uuid_returns_true(self, client):
  15. """A non-zero 32-char hex tray_uuid identifies a BL spool."""
  16. assert client.is_bambu_lab_spool("A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4") is True
  17. def test_valid_tag_uid_returns_true(self, client):
  18. """A non-zero 16-char hex tag_uid identifies a BL spool (fallback)."""
  19. assert client.is_bambu_lab_spool("", tag_uid="A1B2C3D4E5F6A1B2") is True
  20. def test_zero_tray_uuid_returns_false(self, client):
  21. """All-zero tray_uuid means no RFID tag read."""
  22. assert client.is_bambu_lab_spool("00000000000000000000000000000000") is False
  23. def test_zero_tag_uid_returns_false(self, client):
  24. """All-zero tag_uid means no RFID tag read."""
  25. assert client.is_bambu_lab_spool("", tag_uid="0000000000000000") is False
  26. def test_empty_identifiers_returns_false(self, client):
  27. """No identifiers means no BL spool."""
  28. assert client.is_bambu_lab_spool("") is False
  29. assert client.is_bambu_lab_spool("", tag_uid="") is False
  30. def test_tray_info_idx_ignored(self, client):
  31. """tray_info_idx is NOT a reliable BL indicator — third-party spools
  32. using Bambu generic presets also have GF-prefixed tray_info_idx values."""
  33. # Third-party spool with Bambu preset but no RFID identifiers
  34. assert client.is_bambu_lab_spool("", tray_info_idx="GFA00") is False
  35. assert client.is_bambu_lab_spool("", tray_info_idx="GFB00") is False
  36. assert client.is_bambu_lab_spool("", tray_info_idx="GFSA02_04") is False
  37. def test_tray_info_idx_with_valid_uuid_returns_true(self, client):
  38. """BL spool with both RFID UUID and preset ID — detected by UUID."""
  39. assert (
  40. client.is_bambu_lab_spool(
  41. "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  42. tray_info_idx="GFA00",
  43. )
  44. is True
  45. )
  46. def test_tray_uuid_preferred_over_tag_uid(self, client):
  47. """tray_uuid is checked before tag_uid (both valid)."""
  48. assert (
  49. client.is_bambu_lab_spool(
  50. "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  51. tag_uid="A1B2C3D4E5F6A1B2",
  52. )
  53. is True
  54. )
  55. def test_short_tray_uuid_returns_false(self, client):
  56. """UUID must be exactly 32 hex chars."""
  57. assert client.is_bambu_lab_spool("A1B2C3D4") is False
  58. def test_non_hex_tray_uuid_returns_false(self, client):
  59. """UUID must be valid hex."""
  60. assert client.is_bambu_lab_spool("ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ") is False
  61. class TestSpoolmanClient:
  62. """Tests for SpoolmanClient class."""
  63. @pytest.fixture
  64. def client(self):
  65. """Create a SpoolmanClient instance."""
  66. return SpoolmanClient("http://localhost:7912")
  67. @pytest.fixture
  68. def sample_tray(self):
  69. """Create a sample AMSTray for testing."""
  70. return AMSTray(
  71. ams_id=0,
  72. tray_id=0,
  73. tray_type="PLA",
  74. tray_sub_brands="PLA Basic",
  75. tray_color="FF0000FF",
  76. remain=50,
  77. tag_uid="",
  78. tray_uuid="A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  79. tray_info_idx="GFA00",
  80. tray_weight=1000,
  81. )
  82. @pytest.fixture
  83. def existing_spool(self):
  84. """Create a mock existing spool response."""
  85. return {
  86. "id": 42,
  87. "remaining_weight": 800,
  88. "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'},
  89. "filament": {"id": 1, "name": "PLA Red", "material": "PLA"},
  90. }
  91. @pytest.fixture
  92. def mock_filament(self):
  93. """Create a mock filament response."""
  94. return {"id": 1, "name": "PLA Basic", "material": "PLA"}
  95. # ========================================================================
  96. # Tests for sync_ams_tray with disable_weight_sync
  97. # ========================================================================
  98. @pytest.mark.asyncio
  99. async def test_sync_ams_tray_updates_weight_by_default(self, client, sample_tray, existing_spool):
  100. """Verify sync_ams_tray updates remaining_weight by default."""
  101. with (
  102. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  103. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})) as mock_update,
  104. ):
  105. await client.sync_ams_tray(sample_tray, "TestPrinter")
  106. mock_update.assert_called_once()
  107. call_kwargs = mock_update.call_args.kwargs
  108. assert "remaining_weight" in call_kwargs
  109. assert call_kwargs["remaining_weight"] == 500.0 # 50% of 1000g
  110. assert "location" not in call_kwargs
  111. @pytest.mark.asyncio
  112. async def test_sync_ams_tray_skips_weight_when_disabled(self, client, sample_tray, existing_spool):
  113. """Verify sync_ams_tray skips remaining_weight when disable_weight_sync=True."""
  114. with (
  115. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  116. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})) as mock_update,
  117. ):
  118. await client.sync_ams_tray(sample_tray, "TestPrinter", disable_weight_sync=True)
  119. mock_update.assert_called_once()
  120. call_kwargs = mock_update.call_args.kwargs
  121. # remaining_weight should be None (not updated)
  122. assert call_kwargs.get("remaining_weight") is None
  123. # location must never be written by Bambuddy — user-managed in Spoolman
  124. assert "location" not in call_kwargs
  125. @pytest.mark.asyncio
  126. async def test_sync_ams_tray_new_spool_always_includes_weight(self, client, sample_tray, mock_filament):
  127. """Verify new spool creation always includes remaining_weight even when disabled."""
  128. with (
  129. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=None)),
  130. patch.object(client, "_find_or_create_filament", AsyncMock(return_value=mock_filament)),
  131. patch.object(client, "create_spool", AsyncMock(return_value={"id": 99})) as mock_create,
  132. ):
  133. await client.sync_ams_tray(sample_tray, "TestPrinter", disable_weight_sync=True)
  134. mock_create.assert_called_once()
  135. call_kwargs = mock_create.call_args.kwargs
  136. # New spools should ALWAYS include remaining_weight
  137. assert "remaining_weight" in call_kwargs
  138. assert call_kwargs["remaining_weight"] == 500.0 # 50% of 1000g
  139. @pytest.mark.asyncio
  140. async def test_sync_ams_tray_does_not_write_location(self, client, sample_tray, existing_spool):
  141. """Verify sync_ams_tray never writes location= to Spoolman (user-managed field)."""
  142. with (
  143. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  144. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})) as mock_update,
  145. ):
  146. await client.sync_ams_tray(sample_tray, "My Printer", disable_weight_sync=True)
  147. call_kwargs = mock_update.call_args.kwargs
  148. # Bambuddy must never auto-set spool.location — it is user-managed in Spoolman
  149. assert "location" not in call_kwargs
  150. # ========================================================================
  151. # T6: non-BL spool with custom RFID (H5 guard)
  152. # ========================================================================
  153. @pytest.mark.asyncio
  154. async def test_sync_ams_tray_non_bl_rfid_find_or_create_error_returns_none(self, client):
  155. """Non-BL spool with custom RFID: find_or_create_filament failure returns None, not raises.
  156. A third-party spool whose tag_uid is not exactly 16 hex chars is not
  157. identified as BL. sync_ams_tray must catch find_or_create_filament
  158. errors and return None instead of propagating the exception.
  159. """
  160. from backend.app.services.spoolman import SpoolmanUnavailableError
  161. # 8-char tag → spool_tag is set, but is_bambu_lab_spool returns False
  162. tray = AMSTray(
  163. ams_id=0,
  164. tray_id=2,
  165. tray_type="PLA",
  166. tray_sub_brands="eSun PLA+",
  167. tray_color="00FF00FF",
  168. remain=50,
  169. tag_uid="AABB1234",
  170. tray_uuid="",
  171. tray_info_idx="",
  172. tray_weight=1000,
  173. )
  174. with (
  175. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=None)),
  176. patch.object(
  177. client,
  178. "find_or_create_filament",
  179. AsyncMock(side_effect=SpoolmanUnavailableError("timeout")),
  180. ),
  181. ):
  182. result = await client.sync_ams_tray(tray, "TestPrinter")
  183. assert result is None
  184. # ========================================================================
  185. # T7: hint path uncached — get_spool(hint) called when not in cached_spools
  186. # ========================================================================
  187. @pytest.mark.asyncio
  188. async def test_sync_ams_tray_hint_uncached_calls_get_spool(self, client):
  189. """No-RFID path: when hint spool is absent from cached_spools, get_spool is called."""
  190. tray = AMSTray(
  191. ams_id=0,
  192. tray_id=3,
  193. tray_type="PETG",
  194. tray_sub_brands="Generic PETG",
  195. tray_color="0000FFFF",
  196. remain=75,
  197. tag_uid="",
  198. tray_uuid="",
  199. tray_info_idx="",
  200. tray_weight=1000,
  201. )
  202. # cached_spools exists but does NOT contain spool 99
  203. cached_spools = [{"id": 1, "extra": {}}]
  204. fetched_spool = {"id": 99, "extra": {}}
  205. with (
  206. patch.object(client, "get_spool", AsyncMock(return_value=fetched_spool)) as mock_get,
  207. patch.object(client, "update_spool", AsyncMock(return_value=fetched_spool)),
  208. ):
  209. result = await client.sync_ams_tray(
  210. tray,
  211. "TestPrinter",
  212. cached_spools=cached_spools,
  213. spoolman_spool_id_hint=99,
  214. )
  215. assert result is not None
  216. mock_get.assert_awaited_once_with(99)
  217. # ========================================================================
  218. # T8: hint ignored when RFID tag is present
  219. # ========================================================================
  220. @pytest.mark.asyncio
  221. async def test_sync_ams_tray_rfid_takes_precedence_over_hint(self, client, existing_spool):
  222. """When tray_uuid is set, the RFID path is used and the hint is never consulted."""
  223. tray = AMSTray(
  224. ams_id=0,
  225. tray_id=4,
  226. tray_type="PLA",
  227. tray_sub_brands="PLA Basic",
  228. tray_color="FF0000FF",
  229. remain=50,
  230. tag_uid="",
  231. tray_uuid="A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  232. tray_info_idx="GFA00",
  233. tray_weight=1000,
  234. )
  235. with (
  236. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  237. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})),
  238. patch.object(client, "get_spool", AsyncMock()) as mock_get_spool,
  239. ):
  240. result = await client.sync_ams_tray(
  241. tray,
  242. "TestPrinter",
  243. spoolman_spool_id_hint=99,
  244. )
  245. assert result is not None
  246. # hint path (get_spool) must NOT be called when RFID is present
  247. mock_get_spool.assert_not_called()
  248. @pytest.mark.asyncio
  249. async def test_sync_ams_tray_non_bambu_no_rfid_returns_none(self, client):
  250. """Third-party spool without any RFID and no hint returns None."""
  251. # Non-BL spool: no tray_uuid, no tag_uid, no spoolman_spool_id_hint → nothing to match
  252. tray = AMSTray(
  253. ams_id=0,
  254. tray_id=0,
  255. tray_type="PLA",
  256. tray_sub_brands="Third Party PLA",
  257. tray_color="FF0000FF",
  258. remain=50,
  259. tag_uid="",
  260. tray_uuid="",
  261. tray_info_idx="",
  262. tray_weight=1000,
  263. )
  264. result = await client.sync_ams_tray(tray, "TestPrinter")
  265. assert result is None
  266. @pytest.mark.asyncio
  267. async def test_sync_ams_tray_hint_updates_spool_without_rfid(self, client):
  268. """No-RFID fallback: spool_id_hint from local slot-assignment table updates the spool."""
  269. tray = AMSTray(
  270. ams_id=0,
  271. tray_id=0,
  272. tray_type="PLA",
  273. tray_sub_brands="Generic PLA",
  274. tray_color="00FF00FF",
  275. remain=80,
  276. tag_uid="",
  277. tray_uuid="",
  278. tray_info_idx="",
  279. tray_weight=1000,
  280. )
  281. cached_spools = [{"id": 99, "extra": {}}]
  282. with patch.object(client, "update_spool", new_callable=AsyncMock) as mock_update:
  283. mock_update.return_value = {"id": 99}
  284. result = await client.sync_ams_tray(
  285. tray, "TestPrinter", cached_spools=cached_spools, spoolman_spool_id_hint=99
  286. )
  287. assert result is not None
  288. assert result["id"] == 99
  289. mock_update.assert_called_once()
  290. call_kwargs = mock_update.call_args.kwargs
  291. assert "location" not in call_kwargs
  292. @pytest.mark.asyncio
  293. async def test_sync_ams_tray_weight_calculation(self, client, existing_spool):
  294. """Verify remaining weight is calculated correctly for various percentages."""
  295. test_cases = [
  296. (100, 1000, 1000.0), # Full spool
  297. (50, 1000, 500.0), # Half spool
  298. (25, 1000, 250.0), # Quarter spool
  299. (0, 1000, 0.0), # Empty spool
  300. (75, 500, 375.0), # Different spool weight
  301. ]
  302. for remain, weight, expected in test_cases:
  303. tray = AMSTray(
  304. ams_id=0,
  305. tray_id=0,
  306. tray_type="PLA",
  307. tray_sub_brands="PLA Basic",
  308. tray_color="FF0000FF",
  309. remain=remain,
  310. tag_uid="",
  311. tray_uuid="A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  312. tray_info_idx="GFA00",
  313. tray_weight=weight,
  314. )
  315. with (
  316. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  317. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})) as mock_update,
  318. ):
  319. await client.sync_ams_tray(tray, "TestPrinter", disable_weight_sync=False)
  320. call_kwargs = mock_update.call_args.kwargs
  321. assert call_kwargs["remaining_weight"] == expected, (
  322. f"Expected {expected}g for {remain}% of {weight}g, got {call_kwargs['remaining_weight']}"
  323. )
  324. # ========================================================================
  325. # Tests for caching functionality
  326. # ========================================================================
  327. @pytest.mark.asyncio
  328. async def test_find_spool_by_tag_with_cached_spools(self, client):
  329. """Verify find_spool_by_tag uses cached spools when provided (no API call)."""
  330. cached = [
  331. {"id": 1, "extra": {"tag": '"ABC123"'}},
  332. {"id": 2, "extra": {"tag": '"XYZ789"'}},
  333. ]
  334. with patch.object(client, "get_spools", AsyncMock()) as mock_get:
  335. result = await client.find_spool_by_tag("ABC123", cached_spools=cached)
  336. assert result["id"] == 1
  337. mock_get.assert_not_called() # Should NOT call get_spools
  338. @pytest.mark.asyncio
  339. async def test_find_spool_by_tag_without_cached_spools(self, client):
  340. """Verify find_spool_by_tag fetches spools when cache not provided."""
  341. mock_spools = [{"id": 1, "extra": {"tag": '"ABC123"'}}]
  342. with patch.object(client, "get_spools", AsyncMock(return_value=mock_spools)) as mock_get:
  343. result = await client.find_spool_by_tag("ABC123")
  344. assert result["id"] == 1
  345. mock_get.assert_called_once() # Should call get_spools
  346. @pytest.mark.asyncio
  347. async def test_find_spools_by_location_prefix_with_cached_spools(self, client):
  348. """Verify find_spools_by_location_prefix uses cached spools when provided."""
  349. cached = [
  350. {"id": 1, "location": "Printer1 - AMS A1"},
  351. {"id": 2, "location": "Printer2 - AMS A1"},
  352. {"id": 3, "location": "Printer1 - AMS A2"},
  353. ]
  354. with patch.object(client, "get_spools", AsyncMock()) as mock_get:
  355. result = await client.find_spools_by_location_prefix("Printer1 - ", cached_spools=cached)
  356. assert len(result) == 2
  357. assert result[0]["id"] == 1
  358. assert result[1]["id"] == 3
  359. mock_get.assert_not_called() # Should NOT call get_spools
  360. @pytest.mark.asyncio
  361. async def test_sync_ams_tray_with_cached_spools(self, client, sample_tray, existing_spool):
  362. """Verify sync_ams_tray passes cached_spools to find_spool_by_tag."""
  363. cached = [existing_spool]
  364. with (
  365. patch.object(client, "get_spools", AsyncMock()) as mock_get,
  366. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})),
  367. ):
  368. await client.sync_ams_tray(sample_tray, "TestPrinter", cached_spools=cached)
  369. mock_get.assert_not_called() # Should NOT call get_spools
  370. @pytest.mark.asyncio
  371. async def test_clear_location_for_removed_spools_with_cached_spools(self, client):
  372. """Verify clear_location_for_removed_spools uses cached spools."""
  373. cached = [
  374. {"id": 1, "location": "Printer1 - AMS A1", "extra": {"tag": '"A1B2C3D4E5F60718293A4B5C6D7E8F90"'}},
  375. {"id": 2, "location": "Printer1 - AMS A2", "extra": {"tag": '"B1C2D3E4F5061728394A5B6C7D8E9F01"'}},
  376. {"id": 3, "location": "Printer1 - AMS A3", "extra": {"tag": '"C1D2E3F40516273849A5B6C7D8E9F012"'}},
  377. ]
  378. # Tag 3 was cleared, so only tags 1 and 2 are current
  379. current_tags = {
  380. "A1B2C3D4E5F60718293A4B5C6D7E8F90",
  381. "B1C2D3E4F5061728394A5B6C7D8E9F01",
  382. }
  383. with (
  384. patch.object(client, "get_spools", AsyncMock()) as mock_get,
  385. patch.object(client, "update_spool", AsyncMock(return_value={"id": 3})) as mock_update,
  386. ):
  387. cleared = await client.clear_location_for_removed_spools("Printer1", current_tags, cached_spools=cached)
  388. assert cleared == 1
  389. mock_get.assert_not_called() # Should NOT call get_spools
  390. mock_update.assert_called_once()
  391. # Verify it cleared TAG3 (not in current_tags)
  392. call_kwargs = mock_update.call_args.kwargs
  393. assert call_kwargs["spool_id"] == 3
  394. assert call_kwargs.get("clear_location") is True
  395. # ========================================================================
  396. # Tests for retry logic in get_spools
  397. # ========================================================================
  398. @pytest.mark.asyncio
  399. async def test_get_spools_succeeds_on_first_attempt(self, client):
  400. """Verify get_spools succeeds immediately when no errors occur."""
  401. mock_spools = [{"id": 1}, {"id": 2}]
  402. with patch.object(client, "_get_client") as mock_get_client:
  403. mock_http_client = AsyncMock()
  404. mock_response = Mock()
  405. mock_response.raise_for_status = Mock()
  406. mock_response.json = Mock(return_value=mock_spools)
  407. mock_http_client.get = AsyncMock(return_value=mock_response)
  408. mock_get_client.return_value = mock_http_client
  409. result = await client.get_spools()
  410. assert result == mock_spools
  411. mock_get_client.assert_called_once()
  412. mock_http_client.get.assert_called_once()
  413. @pytest.mark.asyncio
  414. async def test_get_spools_retries_on_connection_error(self, client):
  415. """Verify get_spools retries up to 3 times on connection errors."""
  416. import httpx
  417. mock_spools = [{"id": 1}]
  418. with (
  419. patch.object(client, "_get_client") as mock_get_client,
  420. patch.object(client, "close", AsyncMock()) as mock_close,
  421. patch("asyncio.sleep", AsyncMock()) as mock_sleep,
  422. ):
  423. mock_http_client = AsyncMock()
  424. mock_get_client.return_value = mock_http_client
  425. # First 2 attempts fail with ReadError, 3rd succeeds
  426. mock_response = Mock()
  427. mock_response.raise_for_status = Mock()
  428. mock_response.json = Mock(return_value=mock_spools)
  429. mock_http_client.get = AsyncMock(
  430. side_effect=[
  431. httpx.ReadError("Connection closed"),
  432. httpx.ReadError("Connection closed"),
  433. mock_response,
  434. ]
  435. )
  436. result = await client.get_spools()
  437. assert result == mock_spools
  438. assert mock_get_client.call_count == 3
  439. assert mock_http_client.get.call_count == 3
  440. # Should close client twice (after each failed attempt)
  441. assert mock_close.call_count == 2
  442. # Should sleep twice (after first 2 attempts)
  443. assert mock_sleep.call_count == 2
  444. mock_sleep.assert_called_with(0.5)
  445. @pytest.mark.asyncio
  446. async def test_get_spools_raises_after_3_failed_attempts(self, client):
  447. """Verify get_spools raises SpoolmanUnavailableError after 3 failed connection attempts."""
  448. import httpx
  449. from backend.app.services.spoolman import SpoolmanUnavailableError
  450. with (
  451. patch.object(client, "_get_client", AsyncMock()) as mock_get_client,
  452. patch.object(client, "close", AsyncMock()) as mock_close,
  453. patch("asyncio.sleep", AsyncMock()) as mock_sleep,
  454. ):
  455. mock_http_client = AsyncMock()
  456. mock_get_client.return_value = mock_http_client
  457. # All 3 attempts fail
  458. mock_http_client.get.side_effect = httpx.ReadError("Connection closed")
  459. with pytest.raises(SpoolmanUnavailableError):
  460. await client.get_spools()
  461. assert mock_get_client.call_count == 3
  462. assert mock_http_client.get.call_count == 3
  463. # Should close client twice (after first 2 failed attempts, not after 3rd)
  464. assert mock_close.call_count == 2
  465. # Should sleep twice (after first 2 attempts, not after 3rd)
  466. assert mock_sleep.call_count == 2
  467. @pytest.mark.asyncio
  468. async def test_get_spools_handles_non_connection_errors(self, client):
  469. """Verify get_spools retries on non-connection errors without recreating client."""
  470. import httpx
  471. mock_spools = [{"id": 1}]
  472. with (
  473. patch.object(client, "_get_client") as mock_get_client,
  474. patch.object(client, "close", AsyncMock()) as mock_close,
  475. patch("asyncio.sleep", AsyncMock()) as mock_sleep,
  476. ):
  477. mock_http_client = AsyncMock()
  478. mock_get_client.return_value = mock_http_client
  479. # First attempt fails with HTTP error, 2nd succeeds
  480. mock_response_error = Mock()
  481. mock_response_error.raise_for_status = Mock(
  482. side_effect=httpx.HTTPStatusError("500 Server Error", request=Mock(), response=Mock())
  483. )
  484. mock_response_success = Mock()
  485. mock_response_success.raise_for_status = Mock()
  486. mock_response_success.json = Mock(return_value=mock_spools)
  487. mock_http_client.get = AsyncMock(side_effect=[mock_response_error, mock_response_success])
  488. result = await client.get_spools()
  489. assert result == mock_spools
  490. assert mock_get_client.call_count == 2
  491. # Should NOT close client for HTTP errors (only connection errors)
  492. mock_close.assert_not_called()
  493. # Should sleep once (after first failed attempt)
  494. assert mock_sleep.call_count == 1
  495. # ---------------------------------------------------------------------------
  496. # init_spoolman_client — SSRF guard (B4 / T3)
  497. # ---------------------------------------------------------------------------
  498. class TestInitSpoolmanClientSSRFGuard:
  499. """init_spoolman_client must reject genuinely unsafe URLs before creating a client.
  500. Scope: cloud metadata endpoints, multicast, unspecified, non-http(s) schemes,
  501. and numeric-encoded IP bypasses. Loopback and RFC-1918 private ranges are
  502. explicitly allowed — Bambuddy's primary deployment is LAN-local Spoolman.
  503. """
  504. @pytest.mark.asyncio
  505. async def test_cloud_metadata_raises_value_error(self):
  506. with pytest.raises(ValueError, match="cloud metadata"):
  507. await init_spoolman_client("http://169.254.169.254/latest/meta-data/")
  508. @pytest.mark.asyncio
  509. async def test_multicast_raises_value_error(self):
  510. with pytest.raises(ValueError, match="multicast|unspecified"):
  511. await init_spoolman_client("http://224.0.0.1/")
  512. @pytest.mark.asyncio
  513. async def test_unspecified_raises_value_error(self):
  514. with pytest.raises(ValueError, match="multicast|unspecified"):
  515. await init_spoolman_client("http://0.0.0.0/")
  516. @pytest.mark.asyncio
  517. async def test_numeric_encoded_ip_raises_value_error(self):
  518. # decimal-encoded 127.0.0.1 — libc resolves these but ipaddress doesn't
  519. with pytest.raises(ValueError, match="numeric-encoded"):
  520. await init_spoolman_client("http://2130706433/")
  521. @pytest.mark.asyncio
  522. async def test_non_http_scheme_raises_value_error(self):
  523. with pytest.raises(ValueError, match="http or https"):
  524. await init_spoolman_client("file:///etc/passwd")
  525. @pytest.mark.asyncio
  526. async def test_private_ip_is_allowed(self):
  527. """Regression: RFC-1918 private addresses are the normal LAN topology."""
  528. mock_instance = AsyncMock()
  529. with (
  530. patch("backend.app.services.spoolman._spoolman_client", None),
  531. patch("backend.app.services.spoolman.SpoolmanClient", return_value=mock_instance) as mock_cls,
  532. ):
  533. client = await init_spoolman_client("http://192.168.1.50:7912/")
  534. mock_cls.assert_called_once_with("http://192.168.1.50:7912/")
  535. assert client is mock_instance
  536. @pytest.mark.asyncio
  537. async def test_loopback_ip_is_allowed(self):
  538. """Regression: same-host Spoolman via loopback is a supported topology."""
  539. mock_instance = AsyncMock()
  540. with (
  541. patch("backend.app.services.spoolman._spoolman_client", None),
  542. patch("backend.app.services.spoolman.SpoolmanClient", return_value=mock_instance) as mock_cls,
  543. ):
  544. client = await init_spoolman_client("http://127.0.0.1:7912/")
  545. mock_cls.assert_called_once_with("http://127.0.0.1:7912/")
  546. assert client is mock_instance
  547. @pytest.mark.asyncio
  548. async def test_localhost_hostname_is_allowed(self):
  549. # localhost (hostname, not bare IP) is a supported topology for same-host Spoolman
  550. mock_instance = AsyncMock()
  551. with (
  552. patch("backend.app.services.spoolman._spoolman_client", None),
  553. patch("backend.app.services.spoolman.SpoolmanClient", return_value=mock_instance) as mock_cls,
  554. ):
  555. client = await init_spoolman_client("http://localhost:7912/")
  556. mock_cls.assert_called_once_with("http://localhost:7912/")
  557. assert client is mock_instance
  558. @pytest.mark.asyncio
  559. async def test_public_url_is_allowed(self):
  560. mock_instance = AsyncMock()
  561. with (
  562. patch("backend.app.services.spoolman._spoolman_client", None),
  563. patch("backend.app.services.spoolman.SpoolmanClient", return_value=mock_instance) as mock_cls,
  564. ):
  565. client = await init_spoolman_client("http://spoolman.example.com:7912/")
  566. mock_cls.assert_called_once_with("http://spoolman.example.com:7912/")
  567. assert client is mock_instance