test_print_start_expected_promotion.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. """Tests for expected print promotion when auto_archive is disabled (#839).
  2. When auto_archive=False but a print was dispatched by BamBuddy (queue/reprint),
  3. the on_print_start callback must still promote the expected print to _active_prints
  4. so that at print completion the archive_id and ams_mapping are available for
  5. filament usage tracking.
  6. These are pure unit tests that verify the module-level dict manipulation logic
  7. directly, NOT by calling the full on_print_start callback.
  8. """
  9. import time
  10. import pytest
  11. from backend.app.main import (
  12. _active_prints,
  13. _expected_print_creators,
  14. _expected_print_registered_at,
  15. _expected_prints,
  16. _print_ams_mappings,
  17. register_expected_print,
  18. )
  19. @pytest.fixture(autouse=True)
  20. def _clear_dicts():
  21. """Clear module-level tracking dicts before and after each test."""
  22. _expected_prints.clear()
  23. _expected_print_registered_at.clear()
  24. _expected_print_creators.clear()
  25. _print_ams_mappings.clear()
  26. _active_prints.clear()
  27. yield
  28. _expected_prints.clear()
  29. _expected_print_registered_at.clear()
  30. _expected_print_creators.clear()
  31. _print_ams_mappings.clear()
  32. _active_prints.clear()
  33. class TestRegisterExpectedPrint:
  34. """Verify register_expected_print populates all tracking dicts."""
  35. def test_registers_filename_and_variants(self):
  36. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  37. assert _expected_prints[(1, "Box.3mf")] == 54
  38. assert _expected_prints[(1, "Box")] == 54
  39. assert _expected_prints[(1, "Box.gcode")] == 54
  40. def test_stores_ams_mapping(self):
  41. register_expected_print(1, "test.3mf", archive_id=10, ams_mapping=[2, -1, 3])
  42. assert _print_ams_mappings[10] == [2, -1, 3]
  43. def test_no_ams_mapping_when_none(self):
  44. register_expected_print(1, "test.3mf", archive_id=10, ams_mapping=None)
  45. assert 10 not in _print_ams_mappings
  46. def test_stores_creator(self):
  47. register_expected_print(1, "test.3mf", archive_id=10, created_by_id=5)
  48. assert _expected_print_creators[(1, "test.3mf")] == 5
  49. def test_stores_registered_at(self):
  50. before = time.monotonic()
  51. register_expected_print(1, "test.3mf", archive_id=10)
  52. after = time.monotonic()
  53. ts = _expected_print_registered_at[(1, "test.3mf")]
  54. assert before <= ts <= after
  55. class TestExpectedPrintDetection:
  56. """Verify the expected-print detection logic used in on_print_start.
  57. Reproduces the key-building and lookup logic from the auto_archive=False
  58. block in on_print_start to verify that expected prints are correctly
  59. detected across all filename variations.
  60. """
  61. @staticmethod
  62. def _build_check_keys(printer_id: int, filename: str, subtask_name: str):
  63. """Reproduce the key-building logic from on_print_start."""
  64. check_keys = []
  65. if subtask_name:
  66. check_keys += [
  67. (printer_id, subtask_name),
  68. (printer_id, f"{subtask_name}.3mf"),
  69. (printer_id, f"{subtask_name}.gcode.3mf"),
  70. ]
  71. if filename:
  72. base_fn = filename.split("/")[-1] if "/" in filename else filename
  73. check_keys.append((printer_id, base_fn))
  74. no_archive_base = base_fn.replace(".gcode", "").replace(".3mf", "")
  75. check_keys += [
  76. (printer_id, no_archive_base),
  77. (printer_id, f"{no_archive_base}.3mf"),
  78. ]
  79. return check_keys
  80. def test_detects_expected_print_by_subtask(self):
  81. """Expected print is found when subtask_name matches."""
  82. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  83. keys = self._build_check_keys(1, filename="", subtask_name="Box")
  84. assert any(k in _expected_prints for k in keys)
  85. def test_detects_expected_print_by_filename(self):
  86. """Expected print is found when filename matches."""
  87. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  88. keys = self._build_check_keys(1, filename="Box.3mf", subtask_name="")
  89. assert any(k in _expected_prints for k in keys)
  90. def test_detects_expected_print_by_gcode_filename(self):
  91. """Expected print is found when MQTT reports .gcode filename."""
  92. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  93. # MQTT sometimes reports gcode filename
  94. keys = self._build_check_keys(1, filename="Box.gcode", subtask_name="Box")
  95. assert any(k in _expected_prints for k in keys)
  96. def test_no_false_positive_for_different_file(self):
  97. """Expected print NOT found for a different filename."""
  98. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  99. keys = self._build_check_keys(1, filename="Benchy.3mf", subtask_name="Benchy")
  100. assert not any(k in _expected_prints for k in keys)
  101. def test_no_false_positive_for_different_printer(self):
  102. """Expected print NOT found when printer_id doesn't match."""
  103. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  104. keys = self._build_check_keys(2, filename="Box.3mf", subtask_name="Box")
  105. assert not any(k in _expected_prints for k in keys)
  106. def test_empty_expected_prints_returns_false(self):
  107. """No detection when _expected_prints is empty."""
  108. keys = self._build_check_keys(1, filename="test.3mf", subtask_name="test")
  109. assert not any(k in _expected_prints for k in keys)
  110. def test_filename_with_spaces_and_parens(self):
  111. """Handles filenames with spaces and parentheses (e.g. 'Box3.0_(2)_plate_5.3mf')."""
  112. register_expected_print(1, "Box3.0_(2)_plate_5.3mf", archive_id=54, ams_mapping=[1])
  113. keys = self._build_check_keys(
  114. 1,
  115. filename="Box3.0_(2)_plate_5.gcode",
  116. subtask_name="Box3.0_(2)_plate_5",
  117. )
  118. assert any(k in _expected_prints for k in keys)
  119. class TestExpectedPrintPromotion:
  120. """Verify that expected prints are correctly promoted to _active_prints.
  121. Reproduces the expected-print pop + promotion logic from on_print_start
  122. (lines 1468-1496) to verify that _active_prints is populated and
  123. _expected_prints is cleaned up.
  124. """
  125. @staticmethod
  126. def _simulate_expected_print_promotion(printer_id: int, subtask_name: str, filename: str, archive_filename: str):
  127. """Simulate the expected-print lookup and promotion from on_print_start."""
  128. expected_keys = []
  129. if subtask_name:
  130. expected_keys.append((printer_id, subtask_name))
  131. expected_keys.append((printer_id, f"{subtask_name}.3mf"))
  132. expected_keys.append((printer_id, f"{subtask_name}.gcode.3mf"))
  133. if filename:
  134. fname = filename.split("/")[-1] if "/" in filename else filename
  135. expected_keys.append((printer_id, fname))
  136. base = fname.replace(".gcode", "").replace(".3mf", "")
  137. expected_keys.append((printer_id, base))
  138. expected_keys.append((printer_id, f"{base}.3mf"))
  139. expected_archive_id = None
  140. for key in expected_keys:
  141. expected_archive_id = _expected_prints.pop(key, None)
  142. _expected_print_registered_at.pop(key, None)
  143. if expected_archive_id:
  144. for other_key in expected_keys:
  145. _expected_prints.pop(other_key, None)
  146. _expected_print_registered_at.pop(other_key, None)
  147. break
  148. if expected_archive_id:
  149. _active_prints[(printer_id, archive_filename)] = expected_archive_id
  150. if subtask_name:
  151. _active_prints[(printer_id, f"{subtask_name}.3mf")] = expected_archive_id
  152. return expected_archive_id
  153. def test_promotion_populates_active_prints(self):
  154. """After promotion, archive is in _active_prints."""
  155. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  156. archive_id = self._simulate_expected_print_promotion(
  157. printer_id=1,
  158. subtask_name="Box",
  159. filename="Box.gcode",
  160. archive_filename="Box.3mf",
  161. )
  162. assert archive_id == 54
  163. assert _active_prints[(1, "Box.3mf")] == 54
  164. def test_promotion_cleans_up_expected_prints(self):
  165. """After promotion, _expected_prints is empty for this print."""
  166. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  167. self._simulate_expected_print_promotion(
  168. printer_id=1,
  169. subtask_name="Box",
  170. filename="Box.gcode",
  171. archive_filename="Box.3mf",
  172. )
  173. # All variants should be cleaned up
  174. assert (1, "Box.3mf") not in _expected_prints
  175. assert (1, "Box") not in _expected_prints
  176. assert (1, "Box.gcode") not in _expected_prints
  177. def test_ams_mapping_survives_promotion(self):
  178. """_print_ams_mappings is NOT consumed during promotion — it's needed at completion."""
  179. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  180. self._simulate_expected_print_promotion(
  181. printer_id=1,
  182. subtask_name="Box",
  183. filename="Box.gcode",
  184. archive_filename="Box.3mf",
  185. )
  186. # ams_mapping should still be available for on_print_complete
  187. assert _print_ams_mappings[54] == [1]
  188. def test_completion_lookup_finds_promoted_archive(self):
  189. """Simulate on_print_complete finding the archive in _active_prints."""
  190. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  191. self._simulate_expected_print_promotion(
  192. printer_id=1,
  193. subtask_name="Box",
  194. filename="Box.gcode",
  195. archive_filename="Box.3mf",
  196. )
  197. # Simulate on_print_complete key building
  198. completion_keys = [
  199. (1, "Box.3mf"),
  200. (1, "Box.gcode.3mf"),
  201. (1, "Box"),
  202. ]
  203. found_id = None
  204. for key in completion_keys:
  205. found_id = _active_prints.pop(key, None)
  206. if found_id:
  207. break
  208. assert found_id == 54
  209. # And ams_mapping is retrievable
  210. assert _print_ams_mappings.pop(54, None) == [1]
  211. def test_no_promotion_for_external_print(self):
  212. """When no expected print exists, nothing is promoted."""
  213. archive_id = self._simulate_expected_print_promotion(
  214. printer_id=1,
  215. subtask_name="Benchy",
  216. filename="Benchy.gcode",
  217. archive_filename="Benchy.3mf",
  218. )
  219. assert archive_id is None
  220. assert len(_active_prints) == 0
  221. class TestAMSMappingInjection:
  222. """Verify ams_mapping injection into usage tracker session."""
  223. def test_injection_into_session(self):
  224. """ams_mapping from _print_ams_mappings is injectable into a session."""
  225. from datetime import datetime, timezone
  226. from backend.app.services.usage_tracker import PrintSession, _active_sessions
  227. _active_sessions.clear()
  228. # Create a session without ams_mapping (simulates MQTT not providing it)
  229. session = PrintSession(
  230. printer_id=1,
  231. print_name="Box",
  232. started_at=datetime.now(timezone.utc),
  233. tray_remain_start={},
  234. tray_now_at_start=-1,
  235. spool_assignments={},
  236. ams_mapping=None,
  237. )
  238. _active_sessions[1] = session
  239. # Register expected print with ams_mapping
  240. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  241. # Simulate the injection logic from on_print_start
  242. _stored_map = _print_ams_mappings.get(54)
  243. assert _stored_map == [1]
  244. ut_session = _active_sessions.get(1)
  245. assert ut_session is not None
  246. assert ut_session.ams_mapping is None # before injection
  247. ut_session.ams_mapping = _stored_map # injection
  248. assert ut_session.ams_mapping == [1]
  249. _active_sessions.clear()
  250. def test_no_injection_when_session_already_has_mapping(self):
  251. """Don't overwrite existing ams_mapping in session."""
  252. from datetime import datetime, timezone
  253. from backend.app.services.usage_tracker import PrintSession, _active_sessions
  254. _active_sessions.clear()
  255. session = PrintSession(
  256. printer_id=1,
  257. print_name="Box",
  258. started_at=datetime.now(timezone.utc),
  259. tray_remain_start={},
  260. tray_now_at_start=-1,
  261. spool_assignments={},
  262. ams_mapping=[5, 6], # already has mapping from MQTT
  263. )
  264. _active_sessions[1] = session
  265. register_expected_print(1, "Box.3mf", archive_id=54, ams_mapping=[1])
  266. _stored_map = _print_ams_mappings.get(54)
  267. ut_session = _active_sessions.get(1)
  268. # Guard: don't overwrite if session already has a mapping
  269. if ut_session and not ut_session.ams_mapping:
  270. ut_session.ams_mapping = _stored_map
  271. assert ut_session.ams_mapping == [5, 6] # unchanged
  272. _active_sessions.clear()