test_spoolman_tracking.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. """Unit tests for Spoolman tracking service helpers."""
  2. from types import SimpleNamespace
  3. from unittest.mock import AsyncMock, MagicMock, patch
  4. import pytest
  5. from backend.app.services.spoolman_tracking import (
  6. _apply_spool_colors_to_archive,
  7. _get_fallback_spool_tag,
  8. _global_tray_id_to_ams_slot,
  9. _hash_serial_to_hex32,
  10. _resolve_global_tray_id,
  11. _resolve_spool_tag,
  12. build_ams_tray_lookup,
  13. store_print_data,
  14. )
  15. class TestResolveSpoolTag:
  16. """Tests for _resolve_spool_tag()."""
  17. def test_prefers_tray_uuid_over_tag_uid(self):
  18. tray = {"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "tag_uid": "DEADBEEF"}
  19. assert _resolve_spool_tag(tray) == "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"
  20. def test_falls_back_to_tag_uid_when_no_uuid(self):
  21. tray = {"tray_uuid": "", "tag_uid": "DEADBEEF"}
  22. assert _resolve_spool_tag(tray) == "DEADBEEF"
  23. def test_falls_back_to_tag_uid_when_uuid_zero(self):
  24. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": "DEADBEEF"}
  25. assert _resolve_spool_tag(tray) == "DEADBEEF"
  26. def test_rejects_zero_tag_uid(self):
  27. tray = {"tray_uuid": "", "tag_uid": "0000000000000000"}
  28. assert _resolve_spool_tag(tray) == ""
  29. def test_uses_fallback_tag_when_ids_missing(self):
  30. tray = {"tray_uuid": "", "tag_uid": ""}
  31. # global_tray_id 0 -> ams_id 0, tray_id 0
  32. assert _resolve_spool_tag(tray, "01P00A000000000", 0) == "ABA7845700000000"
  33. def test_uses_fallback_tag_when_ids_zero(self):
  34. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": "0000000000000000"}
  35. # global_tray_id 5 -> ams_id 1, tray_id 1
  36. assert _resolve_spool_tag(tray, "01P00A000000000", 5) == "ABA7845700010001"
  37. def test_prefers_tray_uuid_over_fallback_when_non_zero(self):
  38. tray = {"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "tag_uid": ""}
  39. assert _resolve_spool_tag(tray, "01P00A000000000", 0) == "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"
  40. def test_empty_both(self):
  41. tray = {"tray_uuid": "", "tag_uid": ""}
  42. assert _resolve_spool_tag(tray) == ""
  43. def test_missing_keys(self):
  44. assert _resolve_spool_tag({}) == ""
  45. def test_zero_uuid_no_tag(self):
  46. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": ""}
  47. assert _resolve_spool_tag(tray) == ""
  48. class TestResolveGlobalTrayId:
  49. """Tests for _resolve_global_tray_id()."""
  50. def test_default_mapping(self):
  51. """slot 1 -> tray 0, slot 2 -> tray 1, etc."""
  52. assert _resolve_global_tray_id(1, None) == 0
  53. assert _resolve_global_tray_id(2, None) == 1
  54. assert _resolve_global_tray_id(4, None) == 3
  55. def test_custom_mapping(self):
  56. """Custom slot_to_tray overrides default."""
  57. mapping = [5, 2, -1, 0]
  58. assert _resolve_global_tray_id(1, mapping) == 5
  59. assert _resolve_global_tray_id(2, mapping) == 2
  60. assert _resolve_global_tray_id(4, mapping) == 0
  61. def test_unmapped_slot(self):
  62. """Slot with -1 in mapping uses default."""
  63. mapping = [5, -1, 2, 0]
  64. assert _resolve_global_tray_id(2, mapping) == 1 # default: slot 2 -> tray 1
  65. def test_slot_beyond_mapping(self):
  66. """Slot beyond mapping length uses default."""
  67. mapping = [5, 2]
  68. assert _resolve_global_tray_id(3, mapping) == 2 # default: slot 3 -> tray 2
  69. def test_empty_mapping(self):
  70. mapping = []
  71. assert _resolve_global_tray_id(1, mapping) == 0
  72. def test_minus_one_resolves_to_external_spool_when_present(self):
  73. """#1276 (regression of #853): -1 in slot_to_tray is BambuStudio's
  74. encoding for "external spool used" — look up the external spool in
  75. ams_trays rather than falling through to the position-based default
  76. (which would credit an unrelated AMS tray). Reporter ojimpo's H2S
  77. had AMS slot 0 occupied with PLA and ran a TPU external-spool print;
  78. the bug credited the TPU usage to the PLA spool.
  79. """
  80. # Single external spool (most common: H2S/X1C/P1S + external)
  81. assert _resolve_global_tray_id(1, [-1], ams_trays={254: {}}) == 254
  82. # AMS occupied with material AND external in use — fix prevents
  83. # crediting AMS slot 0 (the actual bug from #1276)
  84. assert _resolve_global_tray_id(1, [-1], ams_trays={0: {}, 1: {}, 2: {}, 3: {}, 254: {}}) == 254
  85. # H2D-style deputy nozzle at 255
  86. assert _resolve_global_tray_id(1, [-1], ams_trays={0: {}, 255: {}}) == 255
  87. # Both external slots present (multi-nozzle) — prefer 254 (main on
  88. # single-nozzle, deputy on H2D — matches tray_now reporting)
  89. assert _resolve_global_tray_id(1, [-1], ams_trays={254: {}, 255: {}}) == 254
  90. def test_minus_one_falls_through_when_no_external_in_ams_trays(self):
  91. """If -1 is seen but ams_trays has no external spool (254/255),
  92. fall through to position-based default (legacy behavior preserved
  93. for callers that don't pass ams_trays or pre-fix call sites).
  94. """
  95. # ams_trays without external — fall through to legacy behavior
  96. assert _resolve_global_tray_id(1, [-1], ams_trays={0: {}, 1: {}}) == 0
  97. # No ams_trays passed at all — legacy fallback
  98. assert _resolve_global_tray_id(1, [-1]) == 0
  99. class TestFallbackTagHelpers:
  100. """Tests for frontend-mirrored fallback tag helpers."""
  101. def test_hash_serial_matches_frontend_algorithm(self):
  102. assert _hash_serial_to_hex32("01P00A000000000") == "ABA78457"
  103. # Frontend trims and uppercases before hashing
  104. assert _hash_serial_to_hex32(" 01p00a000000000 ") == "ABA78457"
  105. def test_global_tray_to_ams_slot_standard_ams(self):
  106. assert _global_tray_id_to_ams_slot(0) == (0, 0)
  107. assert _global_tray_id_to_ams_slot(7) == (1, 3)
  108. def test_global_tray_to_ams_slot_ams_ht(self):
  109. assert _global_tray_id_to_ams_slot(128) == (128, 0)
  110. assert _global_tray_id_to_ams_slot(135) == (135, 0)
  111. def test_global_tray_to_ams_slot_external(self):
  112. assert _global_tray_id_to_ams_slot(254) == (255, 0)
  113. assert _global_tray_id_to_ams_slot(255) == (255, 1)
  114. def test_get_fallback_spool_tag_standard(self):
  115. assert _get_fallback_spool_tag("01P00A000000000", 5) == "ABA7845700010001"
  116. def test_get_fallback_spool_tag_ams_ht(self):
  117. assert _get_fallback_spool_tag("01P00A000000000", 128) == "ABA7845700800000"
  118. def test_get_fallback_spool_tag_external(self):
  119. assert _get_fallback_spool_tag("01P00A000000000", 255) == "ABA7845700FF0001"
  120. class TestBuildAmsTrayLookup:
  121. """Tests for build_ams_tray_lookup()."""
  122. def test_single_ams_unit(self):
  123. raw = {
  124. "ams": [
  125. {
  126. "id": 0,
  127. "tray": [
  128. {"id": 0, "tray_uuid": "AAA", "tag_uid": "111", "tray_type": "PLA"},
  129. {"id": 1, "tray_uuid": "BBB", "tag_uid": "222", "tray_type": "ABS"},
  130. ],
  131. }
  132. ]
  133. }
  134. lookup = build_ams_tray_lookup(raw)
  135. assert lookup[0] == {"tray_uuid": "AAA", "tag_uid": "111", "tray_type": "PLA"}
  136. assert lookup[1] == {"tray_uuid": "BBB", "tag_uid": "222", "tray_type": "ABS"}
  137. def test_multiple_ams_units(self):
  138. raw = {
  139. "ams": [
  140. {"id": 0, "tray": [{"id": 0, "tray_uuid": "A", "tag_uid": "", "tray_type": "PLA"}]},
  141. {"id": 1, "tray": [{"id": 0, "tray_uuid": "B", "tag_uid": "", "tray_type": "PETG"}]},
  142. ]
  143. }
  144. lookup = build_ams_tray_lookup(raw)
  145. assert 0 in lookup # AMS 0, tray 0
  146. assert 4 in lookup # AMS 1, tray 0 (1*4+0)
  147. assert lookup[4]["tray_uuid"] == "B"
  148. def test_external_spool(self):
  149. raw = {
  150. "ams": [],
  151. "vt_tray": [{"tray_uuid": "EXT", "tag_uid": "X", "tray_type": "TPU"}],
  152. }
  153. lookup = build_ams_tray_lookup(raw)
  154. assert 254 in lookup
  155. assert lookup[254]["tray_type"] == "TPU"
  156. def test_empty_external_spool_skipped(self):
  157. raw = {"ams": [], "vt_tray": [{"tray_type": ""}]}
  158. lookup = build_ams_tray_lookup(raw)
  159. assert 254 not in lookup
  160. def test_no_ams_data(self):
  161. assert build_ams_tray_lookup({}) == {}
  162. assert build_ams_tray_lookup({"ams": []}) == {}
  163. def test_missing_fields_default(self):
  164. raw = {"ams": [{"id": 0, "tray": [{"id": 0}]}]}
  165. lookup = build_ams_tray_lookup(raw)
  166. assert lookup[0] == {"tray_uuid": "", "tag_uid": "", "tray_type": ""}
  167. class TestStorePrintData:
  168. """Tests for store_print_data()."""
  169. @pytest.mark.asyncio
  170. async def test_prefers_explicit_ams_mapping_over_queue_mapping(self):
  171. db = AsyncMock()
  172. delete_result = MagicMock()
  173. db.execute = AsyncMock(side_effect=[delete_result])
  174. db.add = MagicMock()
  175. db.commit = AsyncMock()
  176. printer_manager = MagicMock()
  177. printer_manager.get_status.return_value = SimpleNamespace(
  178. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "tray_type": "PLA"}, {"id": 1, "tray_type": "PLA"}]}]}
  179. )
  180. mock_settings = MagicMock()
  181. mock_path = MagicMock()
  182. mock_path.exists.return_value = True
  183. mock_settings.base_dir.__truediv__.return_value = mock_path
  184. with (
  185. patch("backend.app.services.spoolman_tracking.app_settings", mock_settings),
  186. patch("backend.app.api.routes.settings.get_setting", AsyncMock(side_effect=["true", "true"])),
  187. patch(
  188. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  189. return_value=[{"slot_id": 1, "used_g": 3.83, "type": "PLA", "color": "#FF0000"}],
  190. ),
  191. patch("backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf", return_value=None),
  192. patch("backend.app.utils.threemf_tools.extract_filament_properties_from_3mf", return_value={}),
  193. ):
  194. await store_print_data(
  195. printer_id=1,
  196. archive_id=15,
  197. file_path="archives/test.3mf",
  198. db=db,
  199. printer_manager=printer_manager,
  200. ams_mapping=[1, -1, -1, -1],
  201. )
  202. db.add.assert_called_once()
  203. tracking = db.add.call_args.args[0]
  204. assert tracking.slot_to_tray == [1, -1, -1, -1]
  205. db.execute.assert_called_once()
  206. @pytest.mark.asyncio
  207. async def test_stores_tracking_when_disable_weight_sync_is_false(self):
  208. """#1119: per-print tracking must run regardless of disable_weight_sync.
  209. Previously store_print_data short-circuited when the deprecated
  210. `spoolman_disable_weight_sync` flag was off, leaving non-BL spools
  211. with no weight-update path at all. Per-print tracking is now the
  212. only weight writer for Spoolman, so it must run whenever Spoolman
  213. is enabled.
  214. """
  215. db = AsyncMock()
  216. db.execute = AsyncMock(return_value=MagicMock())
  217. db.add = MagicMock()
  218. db.commit = AsyncMock()
  219. printer_manager = MagicMock()
  220. printer_manager.get_status.return_value = SimpleNamespace(
  221. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "tray_type": "PLA"}]}]}
  222. )
  223. mock_settings = MagicMock()
  224. mock_path = MagicMock()
  225. mock_path.exists.return_value = True
  226. mock_settings.base_dir.__truediv__.return_value = mock_path
  227. # Only spoolman_enabled is consulted now (disable_weight_sync is no
  228. # longer read). The single side_effect entry proves no extra
  229. # get_setting calls slip back in.
  230. with (
  231. patch("backend.app.services.spoolman_tracking.app_settings", mock_settings),
  232. patch("backend.app.api.routes.settings.get_setting", AsyncMock(side_effect=["true"])),
  233. patch(
  234. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  235. return_value=[{"slot_id": 1, "used_g": 5.0, "type": "PLA", "color": "#FF0000"}],
  236. ),
  237. patch("backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf", return_value=None),
  238. patch("backend.app.utils.threemf_tools.extract_filament_properties_from_3mf", return_value={}),
  239. ):
  240. await store_print_data(
  241. printer_id=1,
  242. archive_id=20,
  243. file_path="archives/test.3mf",
  244. db=db,
  245. printer_manager=printer_manager,
  246. ams_mapping=[0],
  247. )
  248. # Tracking row was inserted — the fix is working.
  249. db.add.assert_called_once()
  250. class TestApplySpoolColorsToArchive:
  251. """`_apply_spool_colors_to_archive` stamps the archive's filament_color
  252. from the matched Spoolman spools (#1494) — the Spoolman-mode mirror of
  253. the built-in inventory rewrite in usage_tracker."""
  254. def _make_db(self, archive):
  255. db = AsyncMock()
  256. db.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=MagicMock(return_value=archive)))
  257. return db
  258. @pytest.mark.asyncio
  259. async def test_rewrites_color_from_spoolman_spool(self):
  260. """The #1494 case: 3MF said #161616, the Spoolman spool is 000000."""
  261. archive = MagicMock()
  262. archive.filament_color = "#161616"
  263. db = self._make_db(archive)
  264. await _apply_spool_colors_to_archive(
  265. db,
  266. archive_id=10,
  267. filament_usage=[{"slot_id": 1, "used_g": 15.9}],
  268. slot_colors={1: "000000"},
  269. )
  270. assert archive.filament_color == "#000000"
  271. db.commit.assert_awaited()
  272. @pytest.mark.asyncio
  273. async def test_empty_slot_colors_is_noop(self):
  274. """No resolved spool colours — never touches the DB."""
  275. db = self._make_db(MagicMock())
  276. await _apply_spool_colors_to_archive(
  277. db, archive_id=10, filament_usage=[{"slot_id": 1, "used_g": 15.9}], slot_colors={}
  278. )
  279. db.execute.assert_not_awaited()
  280. db.commit.assert_not_awaited()
  281. @pytest.mark.asyncio
  282. async def test_partial_match_leaves_archive_untouched(self):
  283. """Slot 2 used but unresolved — keep the 3MF colour, don't load the archive."""
  284. db = self._make_db(MagicMock())
  285. await _apply_spool_colors_to_archive(
  286. db,
  287. archive_id=10,
  288. filament_usage=[
  289. {"slot_id": 1, "used_g": 10.0},
  290. {"slot_id": 2, "used_g": 20.0},
  291. ],
  292. slot_colors={1: "000000"},
  293. )
  294. db.execute.assert_not_awaited()
  295. db.commit.assert_not_awaited()
  296. @pytest.mark.asyncio
  297. async def test_missing_archive_does_not_crash(self):
  298. """Archive row gone (deleted between completion and reporting)."""
  299. db = self._make_db(None)
  300. await _apply_spool_colors_to_archive(
  301. db,
  302. archive_id=10,
  303. filament_usage=[{"slot_id": 1, "used_g": 15.9}],
  304. slot_colors={1: "000000"},
  305. )
  306. db.commit.assert_not_awaited()