test_spoolman_service.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. """Unit tests for Spoolman service.
  2. These tests specifically target the sync_ams_tray method's disable_weight_sync
  3. functionality that controls whether remaining_weight is updated.
  4. Also includes tests for is_bambu_lab_spool RFID detection.
  5. """
  6. from unittest.mock import AsyncMock, Mock, patch
  7. import pytest
  8. from backend.app.services.spoolman import AMSTray, SpoolmanClient
  9. class TestIsBambuLabSpool:
  10. """Tests for is_bambu_lab_spool — detects BL spools via RFID hardware identifiers only."""
  11. @pytest.fixture
  12. def client(self):
  13. return SpoolmanClient("http://localhost:7912")
  14. def test_valid_tray_uuid_returns_true(self, client):
  15. """A non-zero 32-char hex tray_uuid identifies a BL spool."""
  16. assert client.is_bambu_lab_spool("A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4") is True
  17. def test_valid_tag_uid_returns_true(self, client):
  18. """A non-zero 16-char hex tag_uid identifies a BL spool (fallback)."""
  19. assert client.is_bambu_lab_spool("", tag_uid="A1B2C3D4E5F6A1B2") is True
  20. def test_zero_tray_uuid_returns_false(self, client):
  21. """All-zero tray_uuid means no RFID tag read."""
  22. assert client.is_bambu_lab_spool("00000000000000000000000000000000") is False
  23. def test_zero_tag_uid_returns_false(self, client):
  24. """All-zero tag_uid means no RFID tag read."""
  25. assert client.is_bambu_lab_spool("", tag_uid="0000000000000000") is False
  26. def test_empty_identifiers_returns_false(self, client):
  27. """No identifiers means no BL spool."""
  28. assert client.is_bambu_lab_spool("") is False
  29. assert client.is_bambu_lab_spool("", tag_uid="") is False
  30. def test_tray_info_idx_ignored(self, client):
  31. """tray_info_idx is NOT a reliable BL indicator — third-party spools
  32. using Bambu generic presets also have GF-prefixed tray_info_idx values."""
  33. # Third-party spool with Bambu preset but no RFID identifiers
  34. assert client.is_bambu_lab_spool("", tray_info_idx="GFA00") is False
  35. assert client.is_bambu_lab_spool("", tray_info_idx="GFB00") is False
  36. assert client.is_bambu_lab_spool("", tray_info_idx="GFSA02_04") is False
  37. def test_tray_info_idx_with_valid_uuid_returns_true(self, client):
  38. """BL spool with both RFID UUID and preset ID — detected by UUID."""
  39. assert (
  40. client.is_bambu_lab_spool(
  41. "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  42. tray_info_idx="GFA00",
  43. )
  44. is True
  45. )
  46. def test_tray_uuid_preferred_over_tag_uid(self, client):
  47. """tray_uuid is checked before tag_uid (both valid)."""
  48. assert (
  49. client.is_bambu_lab_spool(
  50. "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  51. tag_uid="A1B2C3D4E5F6A1B2",
  52. )
  53. is True
  54. )
  55. def test_short_tray_uuid_returns_false(self, client):
  56. """UUID must be exactly 32 hex chars."""
  57. assert client.is_bambu_lab_spool("A1B2C3D4") is False
  58. def test_non_hex_tray_uuid_returns_false(self, client):
  59. """UUID must be valid hex."""
  60. assert client.is_bambu_lab_spool("ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ") is False
  61. class TestSpoolmanClient:
  62. """Tests for SpoolmanClient class."""
  63. @pytest.fixture
  64. def client(self):
  65. """Create a SpoolmanClient instance."""
  66. return SpoolmanClient("http://localhost:7912")
  67. @pytest.fixture
  68. def sample_tray(self):
  69. """Create a sample AMSTray for testing."""
  70. return AMSTray(
  71. ams_id=0,
  72. tray_id=0,
  73. tray_type="PLA",
  74. tray_sub_brands="PLA Basic",
  75. tray_color="FF0000FF",
  76. remain=50,
  77. tag_uid="",
  78. tray_uuid="A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  79. tray_info_idx="GFA00",
  80. tray_weight=1000,
  81. )
  82. @pytest.fixture
  83. def existing_spool(self):
  84. """Create a mock existing spool response."""
  85. return {
  86. "id": 42,
  87. "remaining_weight": 800,
  88. "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'},
  89. "filament": {"id": 1, "name": "PLA Red", "material": "PLA"},
  90. }
  91. @pytest.fixture
  92. def mock_filament(self):
  93. """Create a mock filament response."""
  94. return {"id": 1, "name": "PLA Basic", "material": "PLA"}
  95. # ========================================================================
  96. # Tests for sync_ams_tray with disable_weight_sync
  97. # ========================================================================
  98. @pytest.mark.asyncio
  99. async def test_sync_ams_tray_updates_weight_by_default(self, client, sample_tray, existing_spool):
  100. """Verify sync_ams_tray updates remaining_weight by default."""
  101. with (
  102. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  103. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})) as mock_update,
  104. ):
  105. await client.sync_ams_tray(sample_tray, "TestPrinter")
  106. mock_update.assert_called_once()
  107. call_kwargs = mock_update.call_args.kwargs
  108. assert "remaining_weight" in call_kwargs
  109. assert call_kwargs["remaining_weight"] == 500.0 # 50% of 1000g
  110. assert "location" in call_kwargs
  111. @pytest.mark.asyncio
  112. async def test_sync_ams_tray_skips_weight_when_disabled(self, client, sample_tray, existing_spool):
  113. """Verify sync_ams_tray skips remaining_weight when disable_weight_sync=True."""
  114. with (
  115. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  116. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})) as mock_update,
  117. ):
  118. await client.sync_ams_tray(sample_tray, "TestPrinter", disable_weight_sync=True)
  119. mock_update.assert_called_once()
  120. call_kwargs = mock_update.call_args.kwargs
  121. # remaining_weight should be None (not updated)
  122. assert call_kwargs.get("remaining_weight") is None
  123. # location should still be updated
  124. assert "location" in call_kwargs
  125. assert "TestPrinter" in call_kwargs["location"]
  126. @pytest.mark.asyncio
  127. async def test_sync_ams_tray_new_spool_always_includes_weight(self, client, sample_tray, mock_filament):
  128. """Verify new spool creation always includes remaining_weight even when disabled."""
  129. with (
  130. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=None)),
  131. patch.object(client, "_find_or_create_filament", AsyncMock(return_value=mock_filament)),
  132. patch.object(client, "create_spool", AsyncMock(return_value={"id": 99})) as mock_create,
  133. ):
  134. await client.sync_ams_tray(sample_tray, "TestPrinter", disable_weight_sync=True)
  135. mock_create.assert_called_once()
  136. call_kwargs = mock_create.call_args.kwargs
  137. # New spools should ALWAYS include remaining_weight
  138. assert "remaining_weight" in call_kwargs
  139. assert call_kwargs["remaining_weight"] == 500.0 # 50% of 1000g
  140. @pytest.mark.asyncio
  141. async def test_sync_ams_tray_location_format(self, client, sample_tray, existing_spool):
  142. """Verify location format is correct when updating spool."""
  143. with (
  144. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  145. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})) as mock_update,
  146. ):
  147. await client.sync_ams_tray(sample_tray, "My Printer", disable_weight_sync=True)
  148. call_kwargs = mock_update.call_args.kwargs
  149. # Location should follow pattern: "PrinterName - AMS A1"
  150. assert "location" in call_kwargs
  151. assert "My Printer" in call_kwargs["location"]
  152. assert "AMS" in call_kwargs["location"]
  153. @pytest.mark.asyncio
  154. async def test_sync_ams_tray_skips_non_bambu_spool(self, client):
  155. """Verify non-Bambu Lab spools are skipped."""
  156. # Third-party spool without proper identifiers
  157. tray = AMSTray(
  158. ams_id=0,
  159. tray_id=0,
  160. tray_type="PLA",
  161. tray_sub_brands="Third Party PLA",
  162. tray_color="FF0000FF",
  163. remain=50,
  164. tag_uid="",
  165. tray_uuid="",
  166. tray_info_idx="", # No Bambu Lab preset ID
  167. tray_weight=1000,
  168. )
  169. result = await client.sync_ams_tray(tray, "TestPrinter")
  170. assert result is None
  171. @pytest.mark.asyncio
  172. async def test_sync_ams_tray_weight_calculation(self, client, existing_spool):
  173. """Verify remaining weight is calculated correctly for various percentages."""
  174. test_cases = [
  175. (100, 1000, 1000.0), # Full spool
  176. (50, 1000, 500.0), # Half spool
  177. (25, 1000, 250.0), # Quarter spool
  178. (0, 1000, 0.0), # Empty spool
  179. (75, 500, 375.0), # Different spool weight
  180. ]
  181. for remain, weight, expected in test_cases:
  182. tray = AMSTray(
  183. ams_id=0,
  184. tray_id=0,
  185. tray_type="PLA",
  186. tray_sub_brands="PLA Basic",
  187. tray_color="FF0000FF",
  188. remain=remain,
  189. tag_uid="",
  190. tray_uuid="A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4",
  191. tray_info_idx="GFA00",
  192. tray_weight=weight,
  193. )
  194. with (
  195. patch.object(client, "find_spool_by_tag", AsyncMock(return_value=existing_spool)),
  196. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})) as mock_update,
  197. ):
  198. await client.sync_ams_tray(tray, "TestPrinter", disable_weight_sync=False)
  199. call_kwargs = mock_update.call_args.kwargs
  200. assert call_kwargs["remaining_weight"] == expected, (
  201. f"Expected {expected}g for {remain}% of {weight}g, got {call_kwargs['remaining_weight']}"
  202. )
  203. # ========================================================================
  204. # Tests for caching functionality
  205. # ========================================================================
  206. @pytest.mark.asyncio
  207. async def test_find_spool_by_tag_with_cached_spools(self, client):
  208. """Verify find_spool_by_tag uses cached spools when provided (no API call)."""
  209. cached = [
  210. {"id": 1, "extra": {"tag": '"ABC123"'}},
  211. {"id": 2, "extra": {"tag": '"XYZ789"'}},
  212. ]
  213. with patch.object(client, "get_spools", AsyncMock()) as mock_get:
  214. result = await client.find_spool_by_tag("ABC123", cached_spools=cached)
  215. assert result["id"] == 1
  216. mock_get.assert_not_called() # Should NOT call get_spools
  217. @pytest.mark.asyncio
  218. async def test_find_spool_by_tag_without_cached_spools(self, client):
  219. """Verify find_spool_by_tag fetches spools when cache not provided."""
  220. mock_spools = [{"id": 1, "extra": {"tag": '"ABC123"'}}]
  221. with patch.object(client, "get_spools", AsyncMock(return_value=mock_spools)) as mock_get:
  222. result = await client.find_spool_by_tag("ABC123")
  223. assert result["id"] == 1
  224. mock_get.assert_called_once() # Should call get_spools
  225. @pytest.mark.asyncio
  226. async def test_find_spools_by_location_prefix_with_cached_spools(self, client):
  227. """Verify find_spools_by_location_prefix uses cached spools when provided."""
  228. cached = [
  229. {"id": 1, "location": "Printer1 - AMS A1"},
  230. {"id": 2, "location": "Printer2 - AMS A1"},
  231. {"id": 3, "location": "Printer1 - AMS A2"},
  232. ]
  233. with patch.object(client, "get_spools", AsyncMock()) as mock_get:
  234. result = await client.find_spools_by_location_prefix("Printer1 - ", cached_spools=cached)
  235. assert len(result) == 2
  236. assert result[0]["id"] == 1
  237. assert result[1]["id"] == 3
  238. mock_get.assert_not_called() # Should NOT call get_spools
  239. @pytest.mark.asyncio
  240. async def test_sync_ams_tray_with_cached_spools(self, client, sample_tray, existing_spool):
  241. """Verify sync_ams_tray passes cached_spools to find_spool_by_tag."""
  242. cached = [existing_spool]
  243. with (
  244. patch.object(client, "get_spools", AsyncMock()) as mock_get,
  245. patch.object(client, "update_spool", AsyncMock(return_value={"id": 42})),
  246. ):
  247. await client.sync_ams_tray(sample_tray, "TestPrinter", cached_spools=cached)
  248. mock_get.assert_not_called() # Should NOT call get_spools
  249. @pytest.mark.asyncio
  250. async def test_clear_location_for_removed_spools_with_cached_spools(self, client):
  251. """Verify clear_location_for_removed_spools uses cached spools."""
  252. cached = [
  253. {"id": 1, "location": "Printer1 - AMS A1", "extra": {"tag": '"A1B2C3D4E5F60718293A4B5C6D7E8F90"'}},
  254. {"id": 2, "location": "Printer1 - AMS A2", "extra": {"tag": '"B1C2D3E4F5061728394A5B6C7D8E9F01"'}},
  255. {"id": 3, "location": "Printer1 - AMS A3", "extra": {"tag": '"C1D2E3F40516273849A5B6C7D8E9F012"'}},
  256. ]
  257. # Tag 3 was cleared, so only tags 1 and 2 are current
  258. current_tags = {
  259. "A1B2C3D4E5F60718293A4B5C6D7E8F90",
  260. "B1C2D3E4F5061728394A5B6C7D8E9F01",
  261. }
  262. with (
  263. patch.object(client, "get_spools", AsyncMock()) as mock_get,
  264. patch.object(client, "update_spool", AsyncMock(return_value={"id": 3})) as mock_update,
  265. ):
  266. cleared = await client.clear_location_for_removed_spools("Printer1", current_tags, cached_spools=cached)
  267. assert cleared == 1
  268. mock_get.assert_not_called() # Should NOT call get_spools
  269. mock_update.assert_called_once()
  270. # Verify it cleared TAG3 (not in current_tags)
  271. call_kwargs = mock_update.call_args.kwargs
  272. assert call_kwargs["spool_id"] == 3
  273. assert call_kwargs.get("clear_location") is True
  274. # ========================================================================
  275. # Tests for retry logic in get_spools
  276. # ========================================================================
  277. @pytest.mark.asyncio
  278. async def test_get_spools_succeeds_on_first_attempt(self, client):
  279. """Verify get_spools succeeds immediately when no errors occur."""
  280. mock_spools = [{"id": 1}, {"id": 2}]
  281. with patch.object(client, "_get_client") as mock_get_client:
  282. mock_http_client = AsyncMock()
  283. mock_response = Mock()
  284. mock_response.raise_for_status = Mock()
  285. mock_response.json = Mock(return_value=mock_spools)
  286. mock_http_client.get = AsyncMock(return_value=mock_response)
  287. mock_get_client.return_value = mock_http_client
  288. result = await client.get_spools()
  289. assert result == mock_spools
  290. mock_get_client.assert_called_once()
  291. mock_http_client.get.assert_called_once()
  292. @pytest.mark.asyncio
  293. async def test_get_spools_retries_on_connection_error(self, client):
  294. """Verify get_spools retries up to 3 times on connection errors."""
  295. import httpx
  296. mock_spools = [{"id": 1}]
  297. with (
  298. patch.object(client, "_get_client") as mock_get_client,
  299. patch.object(client, "close", AsyncMock()) as mock_close,
  300. patch("asyncio.sleep", AsyncMock()) as mock_sleep,
  301. ):
  302. mock_http_client = AsyncMock()
  303. mock_get_client.return_value = mock_http_client
  304. # First 2 attempts fail with ReadError, 3rd succeeds
  305. mock_response = Mock()
  306. mock_response.raise_for_status = Mock()
  307. mock_response.json = Mock(return_value=mock_spools)
  308. mock_http_client.get = AsyncMock(
  309. side_effect=[
  310. httpx.ReadError("Connection closed"),
  311. httpx.ReadError("Connection closed"),
  312. mock_response,
  313. ]
  314. )
  315. result = await client.get_spools()
  316. assert result == mock_spools
  317. assert mock_get_client.call_count == 3
  318. assert mock_http_client.get.call_count == 3
  319. # Should close client twice (after each failed attempt)
  320. assert mock_close.call_count == 2
  321. # Should sleep twice (after first 2 attempts)
  322. assert mock_sleep.call_count == 2
  323. mock_sleep.assert_called_with(0.5)
  324. @pytest.mark.asyncio
  325. async def test_get_spools_raises_after_3_failed_attempts(self, client):
  326. """Verify get_spools raises exception after 3 failed attempts."""
  327. import httpx
  328. with (
  329. patch.object(client, "_get_client", AsyncMock()) as mock_get_client,
  330. patch.object(client, "close", AsyncMock()) as mock_close,
  331. patch("asyncio.sleep", AsyncMock()) as mock_sleep,
  332. ):
  333. mock_http_client = AsyncMock()
  334. mock_get_client.return_value = mock_http_client
  335. # All 3 attempts fail
  336. mock_http_client.get.side_effect = httpx.ReadError("Connection closed")
  337. with pytest.raises(httpx.ReadError):
  338. await client.get_spools()
  339. assert mock_get_client.call_count == 3
  340. assert mock_http_client.get.call_count == 3
  341. # Should close client twice (after first 2 failed attempts, not after 3rd)
  342. assert mock_close.call_count == 2
  343. # Should sleep twice (after first 2 attempts, not after 3rd)
  344. assert mock_sleep.call_count == 2
  345. @pytest.mark.asyncio
  346. async def test_get_spools_handles_non_connection_errors(self, client):
  347. """Verify get_spools retries on non-connection errors without recreating client."""
  348. import httpx
  349. mock_spools = [{"id": 1}]
  350. with (
  351. patch.object(client, "_get_client") as mock_get_client,
  352. patch.object(client, "close", AsyncMock()) as mock_close,
  353. patch("asyncio.sleep", AsyncMock()) as mock_sleep,
  354. ):
  355. mock_http_client = AsyncMock()
  356. mock_get_client.return_value = mock_http_client
  357. # First attempt fails with HTTP error, 2nd succeeds
  358. mock_response_error = Mock()
  359. mock_response_error.raise_for_status = Mock(
  360. side_effect=httpx.HTTPStatusError("500 Server Error", request=Mock(), response=Mock())
  361. )
  362. mock_response_success = Mock()
  363. mock_response_success.raise_for_status = Mock()
  364. mock_response_success.json = Mock(return_value=mock_spools)
  365. mock_http_client.get = AsyncMock(side_effect=[mock_response_error, mock_response_success])
  366. result = await client.get_spools()
  367. assert result == mock_spools
  368. assert mock_get_client.call_count == 2
  369. # Should NOT close client for HTTP errors (only connection errors)
  370. mock_close.assert_not_called()
  371. # Should sleep once (after first failed attempt)
  372. assert mock_sleep.call_count == 1