test_spoolman_tracking.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. """Unit tests for Spoolman tracking service helpers."""
  2. from types import SimpleNamespace
  3. from unittest.mock import AsyncMock, MagicMock, patch
  4. import pytest
  5. from backend.app.services.spoolman_tracking import (
  6. _get_fallback_spool_tag,
  7. _global_tray_id_to_ams_slot,
  8. _hash_serial_to_hex32,
  9. _resolve_global_tray_id,
  10. _resolve_spool_tag,
  11. build_ams_tray_lookup,
  12. store_print_data,
  13. )
  14. class TestResolveSpoolTag:
  15. """Tests for _resolve_spool_tag()."""
  16. def test_prefers_tray_uuid_over_tag_uid(self):
  17. tray = {"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "tag_uid": "DEADBEEF"}
  18. assert _resolve_spool_tag(tray) == "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"
  19. def test_falls_back_to_tag_uid_when_no_uuid(self):
  20. tray = {"tray_uuid": "", "tag_uid": "DEADBEEF"}
  21. assert _resolve_spool_tag(tray) == "DEADBEEF"
  22. def test_falls_back_to_tag_uid_when_uuid_zero(self):
  23. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": "DEADBEEF"}
  24. assert _resolve_spool_tag(tray) == "DEADBEEF"
  25. def test_rejects_zero_tag_uid(self):
  26. tray = {"tray_uuid": "", "tag_uid": "0000000000000000"}
  27. assert _resolve_spool_tag(tray) == ""
  28. def test_uses_fallback_tag_when_ids_missing(self):
  29. tray = {"tray_uuid": "", "tag_uid": ""}
  30. # global_tray_id 0 -> ams_id 0, tray_id 0
  31. assert _resolve_spool_tag(tray, "01P00A000000000", 0) == "ABA7845700000000"
  32. def test_uses_fallback_tag_when_ids_zero(self):
  33. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": "0000000000000000"}
  34. # global_tray_id 5 -> ams_id 1, tray_id 1
  35. assert _resolve_spool_tag(tray, "01P00A000000000", 5) == "ABA7845700010001"
  36. def test_prefers_tray_uuid_over_fallback_when_non_zero(self):
  37. tray = {"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "tag_uid": ""}
  38. assert _resolve_spool_tag(tray, "01P00A000000000", 0) == "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"
  39. def test_empty_both(self):
  40. tray = {"tray_uuid": "", "tag_uid": ""}
  41. assert _resolve_spool_tag(tray) == ""
  42. def test_missing_keys(self):
  43. assert _resolve_spool_tag({}) == ""
  44. def test_zero_uuid_no_tag(self):
  45. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": ""}
  46. assert _resolve_spool_tag(tray) == ""
  47. class TestResolveGlobalTrayId:
  48. """Tests for _resolve_global_tray_id()."""
  49. def test_default_mapping(self):
  50. """slot 1 -> tray 0, slot 2 -> tray 1, etc."""
  51. assert _resolve_global_tray_id(1, None) == 0
  52. assert _resolve_global_tray_id(2, None) == 1
  53. assert _resolve_global_tray_id(4, None) == 3
  54. def test_custom_mapping(self):
  55. """Custom slot_to_tray overrides default."""
  56. mapping = [5, 2, -1, 0]
  57. assert _resolve_global_tray_id(1, mapping) == 5
  58. assert _resolve_global_tray_id(2, mapping) == 2
  59. assert _resolve_global_tray_id(4, mapping) == 0
  60. def test_unmapped_slot(self):
  61. """Slot with -1 in mapping uses default."""
  62. mapping = [5, -1, 2, 0]
  63. assert _resolve_global_tray_id(2, mapping) == 1 # default: slot 2 -> tray 1
  64. def test_slot_beyond_mapping(self):
  65. """Slot beyond mapping length uses default."""
  66. mapping = [5, 2]
  67. assert _resolve_global_tray_id(3, mapping) == 2 # default: slot 3 -> tray 2
  68. def test_empty_mapping(self):
  69. mapping = []
  70. assert _resolve_global_tray_id(1, mapping) == 0
  71. def test_minus_one_resolves_to_external_spool_when_present(self):
  72. """#1276 (regression of #853): -1 in slot_to_tray is BambuStudio's
  73. encoding for "external spool used" — look up the external spool in
  74. ams_trays rather than falling through to the position-based default
  75. (which would credit an unrelated AMS tray). Reporter ojimpo's H2S
  76. had AMS slot 0 occupied with PLA and ran a TPU external-spool print;
  77. the bug credited the TPU usage to the PLA spool.
  78. """
  79. # Single external spool (most common: H2S/X1C/P1S + external)
  80. assert _resolve_global_tray_id(1, [-1], ams_trays={254: {}}) == 254
  81. # AMS occupied with material AND external in use — fix prevents
  82. # crediting AMS slot 0 (the actual bug from #1276)
  83. assert _resolve_global_tray_id(1, [-1], ams_trays={0: {}, 1: {}, 2: {}, 3: {}, 254: {}}) == 254
  84. # H2D-style deputy nozzle at 255
  85. assert _resolve_global_tray_id(1, [-1], ams_trays={0: {}, 255: {}}) == 255
  86. # Both external slots present (multi-nozzle) — prefer 254 (main on
  87. # single-nozzle, deputy on H2D — matches tray_now reporting)
  88. assert _resolve_global_tray_id(1, [-1], ams_trays={254: {}, 255: {}}) == 254
  89. def test_minus_one_falls_through_when_no_external_in_ams_trays(self):
  90. """If -1 is seen but ams_trays has no external spool (254/255),
  91. fall through to position-based default (legacy behavior preserved
  92. for callers that don't pass ams_trays or pre-fix call sites).
  93. """
  94. # ams_trays without external — fall through to legacy behavior
  95. assert _resolve_global_tray_id(1, [-1], ams_trays={0: {}, 1: {}}) == 0
  96. # No ams_trays passed at all — legacy fallback
  97. assert _resolve_global_tray_id(1, [-1]) == 0
  98. class TestFallbackTagHelpers:
  99. """Tests for frontend-mirrored fallback tag helpers."""
  100. def test_hash_serial_matches_frontend_algorithm(self):
  101. assert _hash_serial_to_hex32("01P00A000000000") == "ABA78457"
  102. # Frontend trims and uppercases before hashing
  103. assert _hash_serial_to_hex32(" 01p00a000000000 ") == "ABA78457"
  104. def test_global_tray_to_ams_slot_standard_ams(self):
  105. assert _global_tray_id_to_ams_slot(0) == (0, 0)
  106. assert _global_tray_id_to_ams_slot(7) == (1, 3)
  107. def test_global_tray_to_ams_slot_ams_ht(self):
  108. assert _global_tray_id_to_ams_slot(128) == (128, 0)
  109. assert _global_tray_id_to_ams_slot(135) == (135, 0)
  110. def test_global_tray_to_ams_slot_external(self):
  111. assert _global_tray_id_to_ams_slot(254) == (255, 0)
  112. assert _global_tray_id_to_ams_slot(255) == (255, 1)
  113. def test_get_fallback_spool_tag_standard(self):
  114. assert _get_fallback_spool_tag("01P00A000000000", 5) == "ABA7845700010001"
  115. def test_get_fallback_spool_tag_ams_ht(self):
  116. assert _get_fallback_spool_tag("01P00A000000000", 128) == "ABA7845700800000"
  117. def test_get_fallback_spool_tag_external(self):
  118. assert _get_fallback_spool_tag("01P00A000000000", 255) == "ABA7845700FF0001"
  119. class TestBuildAmsTrayLookup:
  120. """Tests for build_ams_tray_lookup()."""
  121. def test_single_ams_unit(self):
  122. raw = {
  123. "ams": [
  124. {
  125. "id": 0,
  126. "tray": [
  127. {"id": 0, "tray_uuid": "AAA", "tag_uid": "111", "tray_type": "PLA"},
  128. {"id": 1, "tray_uuid": "BBB", "tag_uid": "222", "tray_type": "ABS"},
  129. ],
  130. }
  131. ]
  132. }
  133. lookup = build_ams_tray_lookup(raw)
  134. assert lookup[0] == {"tray_uuid": "AAA", "tag_uid": "111", "tray_type": "PLA"}
  135. assert lookup[1] == {"tray_uuid": "BBB", "tag_uid": "222", "tray_type": "ABS"}
  136. def test_multiple_ams_units(self):
  137. raw = {
  138. "ams": [
  139. {"id": 0, "tray": [{"id": 0, "tray_uuid": "A", "tag_uid": "", "tray_type": "PLA"}]},
  140. {"id": 1, "tray": [{"id": 0, "tray_uuid": "B", "tag_uid": "", "tray_type": "PETG"}]},
  141. ]
  142. }
  143. lookup = build_ams_tray_lookup(raw)
  144. assert 0 in lookup # AMS 0, tray 0
  145. assert 4 in lookup # AMS 1, tray 0 (1*4+0)
  146. assert lookup[4]["tray_uuid"] == "B"
  147. def test_external_spool(self):
  148. raw = {
  149. "ams": [],
  150. "vt_tray": [{"tray_uuid": "EXT", "tag_uid": "X", "tray_type": "TPU"}],
  151. }
  152. lookup = build_ams_tray_lookup(raw)
  153. assert 254 in lookup
  154. assert lookup[254]["tray_type"] == "TPU"
  155. def test_empty_external_spool_skipped(self):
  156. raw = {"ams": [], "vt_tray": [{"tray_type": ""}]}
  157. lookup = build_ams_tray_lookup(raw)
  158. assert 254 not in lookup
  159. def test_no_ams_data(self):
  160. assert build_ams_tray_lookup({}) == {}
  161. assert build_ams_tray_lookup({"ams": []}) == {}
  162. def test_missing_fields_default(self):
  163. raw = {"ams": [{"id": 0, "tray": [{"id": 0}]}]}
  164. lookup = build_ams_tray_lookup(raw)
  165. assert lookup[0] == {"tray_uuid": "", "tag_uid": "", "tray_type": ""}
  166. class TestStorePrintData:
  167. """Tests for store_print_data()."""
  168. @pytest.mark.asyncio
  169. async def test_prefers_explicit_ams_mapping_over_queue_mapping(self):
  170. db = AsyncMock()
  171. delete_result = MagicMock()
  172. db.execute = AsyncMock(side_effect=[delete_result])
  173. db.add = MagicMock()
  174. db.commit = AsyncMock()
  175. printer_manager = MagicMock()
  176. printer_manager.get_status.return_value = SimpleNamespace(
  177. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "tray_type": "PLA"}, {"id": 1, "tray_type": "PLA"}]}]}
  178. )
  179. mock_settings = MagicMock()
  180. mock_path = MagicMock()
  181. mock_path.exists.return_value = True
  182. mock_settings.base_dir.__truediv__.return_value = mock_path
  183. with (
  184. patch("backend.app.services.spoolman_tracking.app_settings", mock_settings),
  185. patch("backend.app.api.routes.settings.get_setting", AsyncMock(side_effect=["true", "true"])),
  186. patch(
  187. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  188. return_value=[{"slot_id": 1, "used_g": 3.83, "type": "PLA", "color": "#FF0000"}],
  189. ),
  190. patch("backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf", return_value=None),
  191. patch("backend.app.utils.threemf_tools.extract_filament_properties_from_3mf", return_value={}),
  192. ):
  193. await store_print_data(
  194. printer_id=1,
  195. archive_id=15,
  196. file_path="archives/test.3mf",
  197. db=db,
  198. printer_manager=printer_manager,
  199. ams_mapping=[1, -1, -1, -1],
  200. )
  201. db.add.assert_called_once()
  202. tracking = db.add.call_args.args[0]
  203. assert tracking.slot_to_tray == [1, -1, -1, -1]
  204. db.execute.assert_called_once()
  205. @pytest.mark.asyncio
  206. async def test_stores_tracking_when_disable_weight_sync_is_false(self):
  207. """#1119: per-print tracking must run regardless of disable_weight_sync.
  208. Previously store_print_data short-circuited when the deprecated
  209. `spoolman_disable_weight_sync` flag was off, leaving non-BL spools
  210. with no weight-update path at all. Per-print tracking is now the
  211. only weight writer for Spoolman, so it must run whenever Spoolman
  212. is enabled.
  213. """
  214. db = AsyncMock()
  215. db.execute = AsyncMock(return_value=MagicMock())
  216. db.add = MagicMock()
  217. db.commit = AsyncMock()
  218. printer_manager = MagicMock()
  219. printer_manager.get_status.return_value = SimpleNamespace(
  220. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "tray_type": "PLA"}]}]}
  221. )
  222. mock_settings = MagicMock()
  223. mock_path = MagicMock()
  224. mock_path.exists.return_value = True
  225. mock_settings.base_dir.__truediv__.return_value = mock_path
  226. # Only spoolman_enabled is consulted now (disable_weight_sync is no
  227. # longer read). The single side_effect entry proves no extra
  228. # get_setting calls slip back in.
  229. with (
  230. patch("backend.app.services.spoolman_tracking.app_settings", mock_settings),
  231. patch("backend.app.api.routes.settings.get_setting", AsyncMock(side_effect=["true"])),
  232. patch(
  233. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  234. return_value=[{"slot_id": 1, "used_g": 5.0, "type": "PLA", "color": "#FF0000"}],
  235. ),
  236. patch("backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf", return_value=None),
  237. patch("backend.app.utils.threemf_tools.extract_filament_properties_from_3mf", return_value={}),
  238. ):
  239. await store_print_data(
  240. printer_id=1,
  241. archive_id=20,
  242. file_path="archives/test.3mf",
  243. db=db,
  244. printer_manager=printer_manager,
  245. ams_mapping=[0],
  246. )
  247. # Tracking row was inserted — the fix is working.
  248. db.add.assert_called_once()