test_spoolman_tracking.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. """Unit tests for Spoolman tracking service helpers."""
  2. from types import SimpleNamespace
  3. from unittest.mock import AsyncMock, MagicMock, patch
  4. import pytest
  5. from backend.app.services.spoolman_tracking import (
  6. _get_fallback_spool_tag,
  7. _global_tray_id_to_ams_slot,
  8. _hash_serial_to_hex32,
  9. _resolve_global_tray_id,
  10. _resolve_spool_tag,
  11. build_ams_tray_lookup,
  12. store_print_data,
  13. )
  14. class TestResolveSpoolTag:
  15. """Tests for _resolve_spool_tag()."""
  16. def test_prefers_tray_uuid(self):
  17. tray = {"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "tag_uid": "DEADBEEF"}
  18. assert _resolve_spool_tag(tray) == "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"
  19. def test_falls_back_to_tag_uid(self):
  20. tray = {"tray_uuid": "", "tag_uid": "DEADBEEF"}
  21. assert _resolve_spool_tag(tray) == "DEADBEEF"
  22. def test_skips_zero_uuid(self):
  23. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": "DEADBEEF"}
  24. assert _resolve_spool_tag(tray) == "DEADBEEF"
  25. def test_rejects_zero_tag_uid(self):
  26. tray = {"tray_uuid": "", "tag_uid": "0000000000000000"}
  27. assert _resolve_spool_tag(tray) == ""
  28. def test_uses_fallback_tag_when_ids_missing(self):
  29. tray = {"tray_uuid": "", "tag_uid": ""}
  30. # global_tray_id 0 -> ams_id 0, tray_id 0
  31. assert _resolve_spool_tag(tray, "01P00A000000000", 0) == "ABA7845700000000"
  32. def test_uses_fallback_tag_when_ids_zero(self):
  33. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": "0000000000000000"}
  34. # global_tray_id 5 -> ams_id 1, tray_id 1
  35. assert _resolve_spool_tag(tray, "01P00A000000000", 5) == "ABA7845700010001"
  36. def test_empty_both(self):
  37. tray = {"tray_uuid": "", "tag_uid": ""}
  38. assert _resolve_spool_tag(tray) == ""
  39. def test_missing_keys(self):
  40. assert _resolve_spool_tag({}) == ""
  41. def test_zero_uuid_no_tag(self):
  42. tray = {"tray_uuid": "00000000000000000000000000000000", "tag_uid": ""}
  43. assert _resolve_spool_tag(tray) == ""
  44. class TestResolveGlobalTrayId:
  45. """Tests for _resolve_global_tray_id()."""
  46. def test_default_mapping(self):
  47. """slot 1 -> tray 0, slot 2 -> tray 1, etc."""
  48. assert _resolve_global_tray_id(1, None) == 0
  49. assert _resolve_global_tray_id(2, None) == 1
  50. assert _resolve_global_tray_id(4, None) == 3
  51. def test_custom_mapping(self):
  52. """Custom slot_to_tray overrides default."""
  53. mapping = [5, 2, -1, 0]
  54. assert _resolve_global_tray_id(1, mapping) == 5
  55. assert _resolve_global_tray_id(2, mapping) == 2
  56. assert _resolve_global_tray_id(4, mapping) == 0
  57. def test_unmapped_slot(self):
  58. """Slot with -1 in mapping uses default."""
  59. mapping = [5, -1, 2, 0]
  60. assert _resolve_global_tray_id(2, mapping) == 1 # default: slot 2 -> tray 1
  61. def test_slot_beyond_mapping(self):
  62. """Slot beyond mapping length uses default."""
  63. mapping = [5, 2]
  64. assert _resolve_global_tray_id(3, mapping) == 2 # default: slot 3 -> tray 2
  65. def test_empty_mapping(self):
  66. mapping = []
  67. assert _resolve_global_tray_id(1, mapping) == 0
  68. class TestFallbackTagHelpers:
  69. """Tests for frontend-mirrored fallback tag helpers."""
  70. def test_hash_serial_matches_frontend_algorithm(self):
  71. assert _hash_serial_to_hex32("01P00A000000000") == "ABA78457"
  72. # Frontend trims and uppercases before hashing
  73. assert _hash_serial_to_hex32(" 01p00a000000000 ") == "ABA78457"
  74. def test_global_tray_to_ams_slot_standard_ams(self):
  75. assert _global_tray_id_to_ams_slot(0) == (0, 0)
  76. assert _global_tray_id_to_ams_slot(7) == (1, 3)
  77. def test_global_tray_to_ams_slot_ams_ht(self):
  78. assert _global_tray_id_to_ams_slot(128) == (128, 0)
  79. assert _global_tray_id_to_ams_slot(135) == (135, 0)
  80. def test_global_tray_to_ams_slot_external(self):
  81. assert _global_tray_id_to_ams_slot(254) == (255, 0)
  82. assert _global_tray_id_to_ams_slot(255) == (255, 1)
  83. def test_get_fallback_spool_tag_standard(self):
  84. assert _get_fallback_spool_tag("01P00A000000000", 5) == "ABA7845700010001"
  85. def test_get_fallback_spool_tag_ams_ht(self):
  86. assert _get_fallback_spool_tag("01P00A000000000", 128) == "ABA7845700800000"
  87. def test_get_fallback_spool_tag_external(self):
  88. assert _get_fallback_spool_tag("01P00A000000000", 255) == "ABA7845700FF0001"
  89. class TestBuildAmsTrayLookup:
  90. """Tests for build_ams_tray_lookup()."""
  91. def test_single_ams_unit(self):
  92. raw = {
  93. "ams": [
  94. {
  95. "id": 0,
  96. "tray": [
  97. {"id": 0, "tray_uuid": "AAA", "tag_uid": "111", "tray_type": "PLA"},
  98. {"id": 1, "tray_uuid": "BBB", "tag_uid": "222", "tray_type": "ABS"},
  99. ],
  100. }
  101. ]
  102. }
  103. lookup = build_ams_tray_lookup(raw)
  104. assert lookup[0] == {"tray_uuid": "AAA", "tag_uid": "111", "tray_type": "PLA"}
  105. assert lookup[1] == {"tray_uuid": "BBB", "tag_uid": "222", "tray_type": "ABS"}
  106. def test_multiple_ams_units(self):
  107. raw = {
  108. "ams": [
  109. {"id": 0, "tray": [{"id": 0, "tray_uuid": "A", "tag_uid": "", "tray_type": "PLA"}]},
  110. {"id": 1, "tray": [{"id": 0, "tray_uuid": "B", "tag_uid": "", "tray_type": "PETG"}]},
  111. ]
  112. }
  113. lookup = build_ams_tray_lookup(raw)
  114. assert 0 in lookup # AMS 0, tray 0
  115. assert 4 in lookup # AMS 1, tray 0 (1*4+0)
  116. assert lookup[4]["tray_uuid"] == "B"
  117. def test_external_spool(self):
  118. raw = {
  119. "ams": [],
  120. "vt_tray": [{"tray_uuid": "EXT", "tag_uid": "X", "tray_type": "TPU"}],
  121. }
  122. lookup = build_ams_tray_lookup(raw)
  123. assert 254 in lookup
  124. assert lookup[254]["tray_type"] == "TPU"
  125. def test_empty_external_spool_skipped(self):
  126. raw = {"ams": [], "vt_tray": [{"tray_type": ""}]}
  127. lookup = build_ams_tray_lookup(raw)
  128. assert 254 not in lookup
  129. def test_no_ams_data(self):
  130. assert build_ams_tray_lookup({}) == {}
  131. assert build_ams_tray_lookup({"ams": []}) == {}
  132. def test_missing_fields_default(self):
  133. raw = {"ams": [{"id": 0, "tray": [{"id": 0}]}]}
  134. lookup = build_ams_tray_lookup(raw)
  135. assert lookup[0] == {"tray_uuid": "", "tag_uid": "", "tray_type": ""}
  136. class TestStorePrintData:
  137. """Tests for store_print_data()."""
  138. @pytest.mark.asyncio
  139. async def test_prefers_explicit_ams_mapping_over_queue_mapping(self):
  140. db = AsyncMock()
  141. delete_result = MagicMock()
  142. db.execute = AsyncMock(side_effect=[delete_result])
  143. db.add = MagicMock()
  144. db.commit = AsyncMock()
  145. printer_manager = MagicMock()
  146. printer_manager.get_status.return_value = SimpleNamespace(
  147. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "tray_type": "PLA"}, {"id": 1, "tray_type": "PLA"}]}]}
  148. )
  149. mock_settings = MagicMock()
  150. mock_path = MagicMock()
  151. mock_path.exists.return_value = True
  152. mock_settings.base_dir.__truediv__.return_value = mock_path
  153. with (
  154. patch("backend.app.services.spoolman_tracking.app_settings", mock_settings),
  155. patch("backend.app.api.routes.settings.get_setting", AsyncMock(side_effect=["true", "true"])),
  156. patch(
  157. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  158. return_value=[{"slot_id": 1, "used_g": 3.83, "type": "PLA", "color": "#FF0000"}],
  159. ),
  160. patch("backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf", return_value=None),
  161. patch("backend.app.utils.threemf_tools.extract_filament_properties_from_3mf", return_value={}),
  162. ):
  163. await store_print_data(
  164. printer_id=1,
  165. archive_id=15,
  166. file_path="archives/test.3mf",
  167. db=db,
  168. printer_manager=printer_manager,
  169. ams_mapping=[1, -1, -1, -1],
  170. )
  171. db.add.assert_called_once()
  172. tracking = db.add.call_args.args[0]
  173. assert tracking.slot_to_tray == [1, -1, -1, -1]
  174. db.execute.assert_called_once()