test_usage_tracker.py 74 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977
  1. """Unit tests for usage_tracker.py — 3MF-primary filament tracking.
  2. Tests the unified tracking logic: 3MF slicer estimates as primary path,
  3. AMS remain% delta as fallback, per-layer gcode for partial prints,
  4. slot-to-tray mapping resolution, and notification variable formatting.
  5. """
  6. from datetime import datetime, timedelta, timezone
  7. from types import SimpleNamespace
  8. from unittest.mock import AsyncMock, MagicMock, patch
  9. import pytest
  10. from backend.app.services.usage_tracker import (
  11. PrintSession,
  12. _active_sessions,
  13. _decode_mqtt_mapping,
  14. _find_3mf_by_filename,
  15. _match_slots_by_color,
  16. _track_from_3mf,
  17. on_print_complete,
  18. on_print_start,
  19. )
  20. def _make_spool(spool_id=1, label_weight=1000, weight_used=0, tag_uid=None, tray_uuid=None):
  21. """Create a mock Spool object."""
  22. spool = MagicMock()
  23. spool.id = spool_id
  24. spool.label_weight = label_weight
  25. spool.weight_used = weight_used
  26. spool.tag_uid = tag_uid
  27. spool.tray_uuid = tray_uuid
  28. spool.last_used = None
  29. spool.cost_per_kg = None
  30. spool.material = "PLA"
  31. return spool
  32. def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
  33. """Create a mock SpoolAssignment object."""
  34. assignment = MagicMock()
  35. assignment.spool_id = spool_id
  36. assignment.printer_id = printer_id
  37. assignment.ams_id = ams_id
  38. assignment.tray_id = tray_id
  39. return assignment
  40. def _make_archive(archive_id=1, file_path="archives/1/test.3mf", extra_data=None):
  41. """Create a mock PrintArchive object."""
  42. archive = MagicMock()
  43. archive.id = archive_id
  44. archive.file_path = file_path
  45. archive.extra_data = extra_data
  46. return archive
  47. def _make_queue_item(ams_mapping=None, status="printing"):
  48. """Create a mock PrintQueueItem object."""
  49. item = MagicMock()
  50. item.ams_mapping = ams_mapping
  51. item.status = status
  52. return item
  53. def _mock_db_execute(*return_values):
  54. """Create a mock db with execute() that returns values in sequence."""
  55. db = AsyncMock()
  56. results = []
  57. for val in return_values:
  58. result = MagicMock()
  59. result.scalar_one_or_none.return_value = val
  60. results.append(result)
  61. db.execute = AsyncMock(side_effect=results)
  62. return db
  63. def _mock_db_sequential(responses):
  64. """Create mock db that returns responses in order."""
  65. db = AsyncMock()
  66. call_count = [0]
  67. async def mock_execute(*args, **kwargs):
  68. idx = call_count[0]
  69. call_count[0] += 1
  70. result = MagicMock()
  71. if idx < len(responses):
  72. result.scalar_one_or_none.return_value = responses[idx]
  73. else:
  74. result.scalar_one_or_none.return_value = None
  75. # For cost aggregation queries that use .scalar() instead of .scalar_one_or_none()
  76. result.scalar.return_value = None
  77. return result
  78. db.execute = mock_execute
  79. return db
  80. class TestOnPrintStart:
  81. """Tests for on_print_start()."""
  82. @pytest.fixture(autouse=True)
  83. def _clear_sessions(self):
  84. _active_sessions.clear()
  85. yield
  86. _active_sessions.clear()
  87. @pytest.mark.asyncio
  88. async def test_captures_remain_data(self):
  89. """Captures AMS remain% at print start."""
  90. printer_manager = MagicMock()
  91. printer_manager.get_status.return_value = SimpleNamespace(
  92. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}, {"id": 1, "remain": 50}]}]},
  93. tray_now=5,
  94. )
  95. await on_print_start(1, {"subtask_name": "Benchy"}, printer_manager)
  96. assert 1 in _active_sessions
  97. session = _active_sessions[1]
  98. assert session.print_name == "Benchy"
  99. assert session.tray_remain_start == {(0, 0): 80, (0, 1): 50}
  100. @pytest.mark.asyncio
  101. async def test_captures_tray_now_at_start(self):
  102. """Captures tray_now at print start for later use in usage tracking."""
  103. printer_manager = MagicMock()
  104. printer_manager.get_status.return_value = SimpleNamespace(
  105. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
  106. tray_now=9,
  107. )
  108. await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
  109. assert _active_sessions[1].tray_now_at_start == 9
  110. @pytest.mark.asyncio
  111. async def test_tray_now_at_start_255_when_unloaded(self):
  112. """Captures tray_now=255 when printer has no filament loaded at start."""
  113. printer_manager = MagicMock()
  114. printer_manager.get_status.return_value = SimpleNamespace(
  115. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
  116. tray_now=255,
  117. )
  118. await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
  119. assert _active_sessions[1].tray_now_at_start == 255
  120. @pytest.mark.asyncio
  121. async def test_creates_session_without_remain(self):
  122. """Creates session even without valid remain data (for 3MF tracking)."""
  123. printer_manager = MagicMock()
  124. printer_manager.get_status.return_value = SimpleNamespace(
  125. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": -1}]}]},
  126. tray_now=255,
  127. )
  128. await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
  129. assert 1 in _active_sessions
  130. assert _active_sessions[1].tray_remain_start == {}
  131. class TestOnPrintComplete:
  132. """Tests for on_print_complete() — path ordering and interaction."""
  133. @pytest.fixture(autouse=True)
  134. def _clear_sessions(self):
  135. _active_sessions.clear()
  136. yield
  137. _active_sessions.clear()
  138. @pytest.fixture(autouse=True)
  139. def _mock_get_setting(self):
  140. with patch(
  141. "backend.app.api.routes.settings.get_setting",
  142. new_callable=AsyncMock,
  143. return_value=None,
  144. ):
  145. yield
  146. @pytest.mark.asyncio
  147. async def test_bl_spool_uses_3mf(self):
  148. """BL spool (with tag_uid) is tracked via 3MF, not just AMS delta."""
  149. spool = _make_spool(spool_id=1, tag_uid="AABB1122", label_weight=1000)
  150. assignment = _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0)
  151. archive = _make_archive(archive_id=10)
  152. # Setup: session with AMS remain data
  153. _active_sessions[1] = PrintSession(
  154. printer_id=1,
  155. print_name="Benchy",
  156. started_at=datetime.now(timezone.utc),
  157. tray_remain_start={(0, 0): 80},
  158. )
  159. # Mock printer state: tray_now=0 (AMS0-T0), single filament
  160. printer_manager = MagicMock()
  161. printer_manager.get_status.return_value = SimpleNamespace(
  162. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  163. progress=100,
  164. layer_num=50,
  165. tray_now=0,
  166. )
  167. # db returns: archive, queue_item(None), assignment, spool
  168. db = _mock_db_sequential([archive, None, assignment, spool])
  169. filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"}]
  170. with (
  171. patch("backend.app.core.config.settings") as mock_settings,
  172. patch(
  173. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  174. return_value=filament_usage,
  175. ),
  176. ):
  177. mock_settings.base_dir = MagicMock()
  178. mock_path = MagicMock()
  179. mock_path.exists.return_value = True
  180. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  181. results = await on_print_complete(
  182. printer_id=1,
  183. data={"status": "completed"},
  184. printer_manager=printer_manager,
  185. db=db,
  186. archive_id=10,
  187. )
  188. # 3MF path should handle it (BL guard removed)
  189. assert len(results) >= 1
  190. assert results[0]["spool_id"] == 1
  191. assert results[0]["weight_used"] == 15.0
  192. @pytest.mark.asyncio
  193. async def test_ams_delta_fallback_no_archive(self):
  194. """AMS delta tracks consumption when archive_id is None."""
  195. spool = _make_spool(spool_id=2, label_weight=1000)
  196. assignment = _make_assignment(spool_id=2)
  197. _active_sessions[1] = PrintSession(
  198. printer_id=1,
  199. print_name="Test",
  200. started_at=datetime.now(timezone.utc),
  201. tray_remain_start={(0, 0): 80},
  202. )
  203. printer_manager = MagicMock()
  204. printer_manager.get_status.return_value = SimpleNamespace(
  205. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  206. tray_now=0,
  207. last_loaded_tray=-1,
  208. )
  209. # Pad 2 Nones for _find_3mf_by_filename DB queries (library + archive search),
  210. # then assignment and spool for the AMS fallback path
  211. db = _mock_db_sequential([None, None, assignment, spool])
  212. results = await on_print_complete(
  213. printer_id=1,
  214. data={"status": "completed"},
  215. printer_manager=printer_manager,
  216. db=db,
  217. archive_id=None,
  218. )
  219. assert len(results) == 1
  220. assert results[0]["spool_id"] == 2
  221. # 10% of 1000g = 100g
  222. assert results[0]["weight_used"] == 100.0
  223. assert results[0]["percent_used"] == 10
  224. @pytest.mark.asyncio
  225. async def test_no_double_tracking(self):
  226. """When 3MF handles a tray, AMS delta skips it."""
  227. spool = _make_spool(spool_id=1, label_weight=1000)
  228. assignment = _make_assignment(spool_id=1)
  229. archive = _make_archive(archive_id=10)
  230. _active_sessions[1] = PrintSession(
  231. printer_id=1,
  232. print_name="Benchy",
  233. started_at=datetime.now(timezone.utc),
  234. tray_remain_start={(0, 0): 80},
  235. )
  236. # tray_now=0 matches the single filament slot
  237. printer_manager = MagicMock()
  238. printer_manager.get_status.return_value = SimpleNamespace(
  239. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  240. progress=100,
  241. layer_num=50,
  242. tray_now=0,
  243. )
  244. # db returns: archive, queue_item(None), assignment, spool
  245. db = _mock_db_sequential([archive, None, assignment, spool])
  246. filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"}]
  247. with (
  248. patch("backend.app.core.config.settings") as mock_settings,
  249. patch(
  250. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  251. return_value=filament_usage,
  252. ),
  253. ):
  254. mock_settings.base_dir = MagicMock()
  255. mock_path = MagicMock()
  256. mock_path.exists.return_value = True
  257. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  258. results = await on_print_complete(
  259. printer_id=1,
  260. data={"status": "completed"},
  261. printer_manager=printer_manager,
  262. db=db,
  263. archive_id=10,
  264. )
  265. # Only 1 result (3MF), NOT 2 (3MF + AMS delta)
  266. assert len(results) == 1
  267. assert results[0]["weight_used"] == 15.0
  268. class TestTrackFrom3mf:
  269. """Tests for _track_from_3mf() — per-layer, linear scaling, and slot mapping."""
  270. @pytest.mark.asyncio
  271. async def test_prefers_live_assignment_when_reassigned_mid_print(self):
  272. """If tray assignment changed during print, track usage on the new spool."""
  273. spool_old = _make_spool(spool_id=1, label_weight=1000)
  274. spool_new = _make_spool(spool_id=2, label_weight=1000)
  275. archive = _make_archive(archive_id=80)
  276. live_assignment = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  277. started_at = datetime.now(timezone.utc)
  278. live_assignment.created_at = started_at + timedelta(seconds=5)
  279. # db: archive, queue_item(None), live assignment lookup, spool_new lookup
  280. db = _mock_db_sequential([archive, None, live_assignment, spool_new])
  281. printer_manager = MagicMock()
  282. printer_manager.get_status.return_value = SimpleNamespace(
  283. progress=100,
  284. layer_num=50,
  285. tray_now=0,
  286. )
  287. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
  288. handled_trays: set[tuple[int, int]] = set()
  289. with (
  290. patch("backend.app.core.config.settings") as mock_settings,
  291. patch(
  292. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  293. return_value=filament_usage,
  294. ),
  295. ):
  296. mock_settings.base_dir = MagicMock()
  297. mock_path = MagicMock()
  298. mock_path.exists.return_value = True
  299. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  300. results = await _track_from_3mf(
  301. printer_id=1,
  302. archive_id=80,
  303. status="completed",
  304. print_name="MidPrintReassign",
  305. handled_trays=handled_trays,
  306. printer_manager=printer_manager,
  307. db=db,
  308. spool_assignments={(0, 0): spool_old.id},
  309. print_started_at=started_at,
  310. )
  311. assert len(results) == 1
  312. assert results[0]["spool_id"] == spool_new.id
  313. @pytest.mark.asyncio
  314. async def test_keeps_snapshot_when_live_assignment_predates_print(self):
  315. """If live assignment predates print start, preserve snapshot spool mapping."""
  316. spool_old = _make_spool(spool_id=1, label_weight=1000)
  317. archive = _make_archive(archive_id=81)
  318. live_assignment = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  319. started_at = datetime.now(timezone.utc)
  320. live_assignment.created_at = started_at - timedelta(seconds=5)
  321. # db: archive, queue_item(None), live assignment lookup, spool_old lookup
  322. db = _mock_db_sequential([archive, None, live_assignment, spool_old])
  323. printer_manager = MagicMock()
  324. printer_manager.get_status.return_value = SimpleNamespace(
  325. progress=100,
  326. layer_num=50,
  327. tray_now=0,
  328. )
  329. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
  330. handled_trays: set[tuple[int, int]] = set()
  331. with (
  332. patch("backend.app.core.config.settings") as mock_settings,
  333. patch(
  334. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  335. return_value=filament_usage,
  336. ),
  337. ):
  338. mock_settings.base_dir = MagicMock()
  339. mock_path = MagicMock()
  340. mock_path.exists.return_value = True
  341. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  342. results = await _track_from_3mf(
  343. printer_id=1,
  344. archive_id=81,
  345. status="completed",
  346. print_name="SnapshotPreserved",
  347. handled_trays=handled_trays,
  348. printer_manager=printer_manager,
  349. db=db,
  350. spool_assignments={(0, 0): spool_old.id},
  351. print_started_at=started_at,
  352. )
  353. assert len(results) == 1
  354. assert results[0]["spool_id"] == spool_old.id
  355. @pytest.mark.asyncio
  356. async def test_linear_fallback_for_partial_print(self):
  357. """Falls back to linear scaling when gcode layer data unavailable."""
  358. spool = _make_spool(spool_id=1, label_weight=1000)
  359. assignment = _make_assignment(spool_id=1)
  360. archive = _make_archive(archive_id=10)
  361. # db: archive, queue_item(None), assignment, spool
  362. db = _mock_db_sequential([archive, None, assignment, spool])
  363. printer_manager = MagicMock()
  364. printer_manager.get_status.return_value = SimpleNamespace(
  365. progress=50,
  366. layer_num=25,
  367. tray_now=0,
  368. )
  369. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
  370. handled_trays: set[tuple[int, int]] = set()
  371. with (
  372. patch("backend.app.core.config.settings") as mock_settings,
  373. patch(
  374. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  375. return_value=filament_usage,
  376. ),
  377. patch(
  378. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  379. return_value=None, # No layer data available
  380. ),
  381. ):
  382. mock_settings.base_dir = MagicMock()
  383. mock_path = MagicMock()
  384. mock_path.exists.return_value = True
  385. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  386. results = await _track_from_3mf(
  387. printer_id=1,
  388. archive_id=10,
  389. status="failed",
  390. print_name="Benchy",
  391. handled_trays=handled_trays,
  392. printer_manager=printer_manager,
  393. db=db,
  394. )
  395. assert len(results) == 1
  396. # 50% of 20g = 10g
  397. assert results[0]["weight_used"] == 10.0
  398. # Tray should be marked as handled
  399. assert (0, 0) in handled_trays
  400. @pytest.mark.asyncio
  401. async def test_per_layer_partial_print(self):
  402. """Failed print at layer N uses gcode cumulative data."""
  403. spool = _make_spool(spool_id=1, label_weight=1000)
  404. assignment = _make_assignment(spool_id=1)
  405. archive = _make_archive(archive_id=10)
  406. # db: archive, queue_item(None), assignment, spool
  407. db = _mock_db_sequential([archive, None, assignment, spool])
  408. printer_manager = MagicMock()
  409. printer_manager.get_status.return_value = SimpleNamespace(
  410. progress=50,
  411. layer_num=25,
  412. tray_now=0,
  413. )
  414. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
  415. # Per-layer data: at layer 25, filament 0 used 5000mm
  416. layer_data = {10: {0: 2000.0}, 25: {0: 5000.0}, 50: {0: 10000.0}}
  417. filament_props = {1: {"density": 1.24, "diameter": 1.75}}
  418. handled_trays: set[tuple[int, int]] = set()
  419. with (
  420. patch("backend.app.core.config.settings") as mock_settings,
  421. patch(
  422. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  423. return_value=filament_usage,
  424. ),
  425. patch(
  426. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  427. return_value=layer_data,
  428. ),
  429. patch(
  430. "backend.app.utils.threemf_tools.get_cumulative_usage_at_layer",
  431. return_value={0: 5000.0},
  432. ),
  433. patch(
  434. "backend.app.utils.threemf_tools.extract_filament_properties_from_3mf",
  435. return_value=filament_props,
  436. ),
  437. patch(
  438. "backend.app.utils.threemf_tools.mm_to_grams",
  439. return_value=12.0, # 5000mm at 1.75mm/1.24g/cm3
  440. ),
  441. ):
  442. mock_settings.base_dir = MagicMock()
  443. mock_path = MagicMock()
  444. mock_path.exists.return_value = True
  445. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  446. results = await _track_from_3mf(
  447. printer_id=1,
  448. archive_id=10,
  449. status="failed",
  450. print_name="Benchy",
  451. handled_trays=handled_trays,
  452. printer_manager=printer_manager,
  453. db=db,
  454. )
  455. assert len(results) == 1
  456. # Should use per-layer grams (12.0g), not linear scale (10.0g)
  457. assert results[0]["weight_used"] == 12.0
  458. @pytest.mark.asyncio
  459. async def test_completed_print_uses_full_weight(self):
  460. """Completed print uses full 3MF weight (scale=1.0)."""
  461. spool = _make_spool(spool_id=1, label_weight=1000)
  462. assignment = _make_assignment(spool_id=1)
  463. archive = _make_archive(archive_id=10)
  464. # db: archive, queue_item(None), assignment, spool
  465. db = _mock_db_sequential([archive, None, assignment, spool])
  466. printer_manager = MagicMock()
  467. printer_manager.get_status.return_value = SimpleNamespace(
  468. progress=100,
  469. layer_num=50,
  470. tray_now=0,
  471. )
  472. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
  473. handled_trays: set[tuple[int, int]] = set()
  474. with (
  475. patch("backend.app.core.config.settings") as mock_settings,
  476. patch(
  477. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  478. return_value=filament_usage,
  479. ),
  480. ):
  481. mock_settings.base_dir = MagicMock()
  482. mock_path = MagicMock()
  483. mock_path.exists.return_value = True
  484. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  485. results = await _track_from_3mf(
  486. printer_id=1,
  487. archive_id=10,
  488. status="completed",
  489. print_name="Benchy",
  490. handled_trays=handled_trays,
  491. printer_manager=printer_manager,
  492. db=db,
  493. )
  494. assert len(results) == 1
  495. assert results[0]["weight_used"] == 20.0
  496. @pytest.mark.asyncio
  497. async def test_tray_now_override_for_single_filament(self):
  498. """Single-filament non-queue print uses tray_now instead of slot_id mapping."""
  499. # Spool 2 is at AMS1-T3 (global_tray_id=7)
  500. spool = _make_spool(spool_id=2, label_weight=1000)
  501. assignment = _make_assignment(spool_id=2, ams_id=1, tray_id=3)
  502. archive = _make_archive(archive_id=10)
  503. # db: archive, queue_item(None), assignment, spool
  504. db = _mock_db_sequential([archive, None, assignment, spool])
  505. # tray_now=7 = (ams_id=1, tray_id=3), the ACTUAL tray used
  506. printer_manager = MagicMock()
  507. printer_manager.get_status.return_value = SimpleNamespace(
  508. progress=100,
  509. layer_num=50,
  510. tray_now=7,
  511. )
  512. # 3MF has slot_id=12 (would default-map to ams_id=2, tray_id=3 — WRONG)
  513. filament_usage = [{"slot_id": 12, "used_g": 10.6, "type": "PLA", "color": "#FF0000"}]
  514. handled_trays: set[tuple[int, int]] = set()
  515. with (
  516. patch("backend.app.core.config.settings") as mock_settings,
  517. patch(
  518. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  519. return_value=filament_usage,
  520. ),
  521. ):
  522. mock_settings.base_dir = MagicMock()
  523. mock_path = MagicMock()
  524. mock_path.exists.return_value = True
  525. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  526. results = await _track_from_3mf(
  527. printer_id=1,
  528. archive_id=10,
  529. status="completed",
  530. print_name="Test",
  531. handled_trays=handled_trays,
  532. printer_manager=printer_manager,
  533. db=db,
  534. )
  535. assert len(results) == 1
  536. assert results[0]["spool_id"] == 2
  537. assert results[0]["ams_id"] == 1
  538. assert results[0]["tray_id"] == 3
  539. assert results[0]["weight_used"] == 10.6
  540. assert (1, 3) in handled_trays
  541. @pytest.mark.asyncio
  542. async def test_queue_ams_mapping_overrides_default(self):
  543. """Queue item ams_mapping overrides default slot_id mapping."""
  544. # Spool at AMS1-T3 (global_tray_id=7)
  545. spool = _make_spool(spool_id=5, label_weight=1000)
  546. assignment = _make_assignment(spool_id=5, ams_id=1, tray_id=3)
  547. archive = _make_archive(archive_id=20)
  548. # Queue item maps slot 1 → global tray 7 (ams_id=1, tray_id=3)
  549. queue_item = _make_queue_item(ams_mapping="[7, -1, -1, -1]")
  550. # db: archive, queue_item, assignment, spool
  551. db = _mock_db_sequential([archive, queue_item, assignment, spool])
  552. printer_manager = MagicMock()
  553. printer_manager.get_status.return_value = SimpleNamespace(
  554. progress=100,
  555. layer_num=50,
  556. tray_now=7,
  557. )
  558. filament_usage = [{"slot_id": 1, "used_g": 25.0, "type": "PETG", "color": ""}]
  559. handled_trays: set[tuple[int, int]] = set()
  560. with (
  561. patch("backend.app.core.config.settings") as mock_settings,
  562. patch(
  563. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  564. return_value=filament_usage,
  565. ),
  566. ):
  567. mock_settings.base_dir = MagicMock()
  568. mock_path = MagicMock()
  569. mock_path.exists.return_value = True
  570. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  571. results = await _track_from_3mf(
  572. printer_id=1,
  573. archive_id=20,
  574. status="completed",
  575. print_name="Queue Print",
  576. handled_trays=handled_trays,
  577. printer_manager=printer_manager,
  578. db=db,
  579. )
  580. assert len(results) == 1
  581. assert results[0]["spool_id"] == 5
  582. assert results[0]["ams_id"] == 1
  583. assert results[0]["tray_id"] == 3
  584. assert results[0]["weight_used"] == 25.0
  585. @pytest.mark.asyncio
  586. async def test_multi_filament_uses_queue_mapping(self):
  587. """Multi-filament queue prints use ams_mapping for each slot."""
  588. spool_a = _make_spool(spool_id=1, label_weight=1000)
  589. spool_b = _make_spool(spool_id=2, label_weight=1000)
  590. assign_a = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  591. assign_b = _make_assignment(spool_id=2, ams_id=1, tray_id=2)
  592. archive = _make_archive(archive_id=30)
  593. # slot 1 → tray 0 (AMS0-T0), slot 2 → tray 6 (AMS1-T2)
  594. queue_item = _make_queue_item(ams_mapping="[0, 6]")
  595. # db: archive, queue_item, assign_a, spool_a, assign_b, spool_b
  596. db = _mock_db_sequential([archive, queue_item, assign_a, spool_a, assign_b, spool_b])
  597. printer_manager = MagicMock()
  598. printer_manager.get_status.return_value = SimpleNamespace(
  599. progress=100,
  600. layer_num=50,
  601. tray_now=6,
  602. )
  603. filament_usage = [
  604. {"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""},
  605. {"slot_id": 2, "used_g": 5.0, "type": "PETG", "color": ""},
  606. ]
  607. handled_trays: set[tuple[int, int]] = set()
  608. with (
  609. patch("backend.app.core.config.settings") as mock_settings,
  610. patch(
  611. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  612. return_value=filament_usage,
  613. ),
  614. ):
  615. mock_settings.base_dir = MagicMock()
  616. mock_path = MagicMock()
  617. mock_path.exists.return_value = True
  618. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  619. results = await _track_from_3mf(
  620. printer_id=1,
  621. archive_id=30,
  622. status="completed",
  623. print_name="Multi",
  624. handled_trays=handled_trays,
  625. printer_manager=printer_manager,
  626. db=db,
  627. )
  628. assert len(results) == 2
  629. assert results[0]["spool_id"] == 1
  630. assert results[0]["ams_id"] == 0
  631. assert results[0]["tray_id"] == 0
  632. assert results[0]["weight_used"] == 10.0
  633. assert results[1]["spool_id"] == 2
  634. assert results[1]["ams_id"] == 1
  635. assert results[1]["tray_id"] == 2
  636. assert results[1]["weight_used"] == 5.0
  637. @pytest.mark.asyncio
  638. async def test_no_tray_now_override_for_multi_filament(self):
  639. """Multi-filament non-queue prints fall back to default mapping, not tray_now."""
  640. spool = _make_spool(spool_id=1, label_weight=1000)
  641. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  642. archive = _make_archive(archive_id=10)
  643. # db: archive, queue_item(None), assignment, spool (2nd slot has no assignment)
  644. db = _mock_db_sequential([archive, None, assignment, spool, None])
  645. printer_manager = MagicMock()
  646. printer_manager.get_status.return_value = SimpleNamespace(
  647. progress=100,
  648. layer_num=50,
  649. tray_now=4, # tray_now won't be used
  650. )
  651. # Two filament slots with usage
  652. filament_usage = [
  653. {"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""},
  654. {"slot_id": 2, "used_g": 5.0, "type": "PETG", "color": ""},
  655. ]
  656. handled_trays: set[tuple[int, int]] = set()
  657. with (
  658. patch("backend.app.core.config.settings") as mock_settings,
  659. patch(
  660. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  661. return_value=filament_usage,
  662. ),
  663. ):
  664. mock_settings.base_dir = MagicMock()
  665. mock_path = MagicMock()
  666. mock_path.exists.return_value = True
  667. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  668. results = await _track_from_3mf(
  669. printer_id=1,
  670. archive_id=10,
  671. status="completed",
  672. print_name="Test",
  673. handled_trays=handled_trays,
  674. printer_manager=printer_manager,
  675. db=db,
  676. )
  677. # Should use default mapping (slot 1 → tray 0, slot 2 → tray 1)
  678. assert len(results) == 1 # Only slot 1 has assignment
  679. assert results[0]["ams_id"] == 0
  680. assert results[0]["tray_id"] == 0
  681. @pytest.mark.asyncio
  682. async def test_stored_ams_mapping_overrides_all(self):
  683. """Stored ams_mapping from print command takes priority over queue and tray_now."""
  684. # Spool at AMS2-T1 (global_tray_id=9)
  685. spool = _make_spool(spool_id=10, label_weight=1000)
  686. assignment = _make_assignment(spool_id=10, ams_id=2, tray_id=1)
  687. archive = _make_archive(archive_id=50)
  688. # db: archive, assignment, spool (no queue lookup when ams_mapping provided)
  689. db = _mock_db_sequential([archive, assignment, spool])
  690. printer_manager = MagicMock()
  691. printer_manager.get_status.return_value = SimpleNamespace(
  692. progress=100,
  693. layer_num=50,
  694. tray_now=0, # Different from mapped tray — should be ignored
  695. last_loaded_tray=0,
  696. )
  697. filament_usage = [{"slot_id": 2, "used_g": 1.57, "type": "PLA", "color": "#FFFFFF"}]
  698. handled_trays: set[tuple[int, int]] = set()
  699. with (
  700. patch("backend.app.core.config.settings") as mock_settings,
  701. patch(
  702. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  703. return_value=filament_usage,
  704. ),
  705. ):
  706. mock_settings.base_dir = MagicMock()
  707. mock_path = MagicMock()
  708. mock_path.exists.return_value = True
  709. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  710. # ams_mapping: slot 2 (index 1) -> tray 9 (AMS2-T1)
  711. results = await _track_from_3mf(
  712. printer_id=1,
  713. archive_id=50,
  714. status="completed",
  715. print_name="Test",
  716. handled_trays=handled_trays,
  717. printer_manager=printer_manager,
  718. db=db,
  719. ams_mapping=[-1, 9],
  720. )
  721. assert len(results) == 1
  722. assert results[0]["spool_id"] == 10
  723. assert results[0]["ams_id"] == 2
  724. assert results[0]["tray_id"] == 1
  725. assert results[0]["weight_used"] == 1.6 # rounded
  726. @pytest.mark.asyncio
  727. async def test_last_loaded_tray_fallback(self):
  728. """Falls back to last_loaded_tray when tray_now_at_start and current tray_now are both 255."""
  729. # Spool at AMS2-T1 (global_tray_id=9)
  730. spool = _make_spool(spool_id=11, label_weight=1000)
  731. assignment = _make_assignment(spool_id=11, ams_id=2, tray_id=1)
  732. archive = _make_archive(archive_id=60)
  733. # db: archive, queue_item(None), assignment, spool
  734. db = _mock_db_sequential([archive, None, assignment, spool])
  735. # H2D scenario: tray_now=255 at completion, but last_loaded_tray=9
  736. printer_manager = MagicMock()
  737. printer_manager.get_status.return_value = SimpleNamespace(
  738. progress=100,
  739. layer_num=50,
  740. tray_now=255,
  741. last_loaded_tray=9,
  742. )
  743. filament_usage = [{"slot_id": 6, "used_g": 1.52, "type": "PLA", "color": "#7CC4D5"}]
  744. handled_trays: set[tuple[int, int]] = set()
  745. with (
  746. patch("backend.app.core.config.settings") as mock_settings,
  747. patch(
  748. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  749. return_value=filament_usage,
  750. ),
  751. ):
  752. mock_settings.base_dir = MagicMock()
  753. mock_path = MagicMock()
  754. mock_path.exists.return_value = True
  755. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  756. results = await _track_from_3mf(
  757. printer_id=1,
  758. archive_id=60,
  759. status="completed",
  760. print_name="Cube",
  761. handled_trays=handled_trays,
  762. printer_manager=printer_manager,
  763. db=db,
  764. tray_now_at_start=255, # H2D: 255 at start too
  765. )
  766. assert len(results) == 1
  767. assert results[0]["spool_id"] == 11
  768. assert results[0]["ams_id"] == 2
  769. assert results[0]["tray_id"] == 1
  770. @pytest.mark.asyncio
  771. async def test_tray_now_at_start_preferred_over_last_loaded(self):
  772. """tray_now_at_start is used before last_loaded_tray fallback."""
  773. spool = _make_spool(spool_id=3, label_weight=1000)
  774. assignment = _make_assignment(spool_id=3, ams_id=1, tray_id=1)
  775. archive = _make_archive(archive_id=70)
  776. db = _mock_db_sequential([archive, None, assignment, spool])
  777. # tray_now_at_start=5 (valid), last_loaded_tray=9 (different) — should use 5
  778. printer_manager = MagicMock()
  779. printer_manager.get_status.return_value = SimpleNamespace(
  780. progress=100,
  781. layer_num=50,
  782. tray_now=255,
  783. last_loaded_tray=9,
  784. )
  785. filament_usage = [{"slot_id": 1, "used_g": 5.0, "type": "PLA", "color": ""}]
  786. handled_trays: set[tuple[int, int]] = set()
  787. with (
  788. patch("backend.app.core.config.settings") as mock_settings,
  789. patch(
  790. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  791. return_value=filament_usage,
  792. ),
  793. ):
  794. mock_settings.base_dir = MagicMock()
  795. mock_path = MagicMock()
  796. mock_path.exists.return_value = True
  797. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  798. results = await _track_from_3mf(
  799. printer_id=1,
  800. archive_id=70,
  801. status="completed",
  802. print_name="Test",
  803. handled_trays=handled_trays,
  804. printer_manager=printer_manager,
  805. db=db,
  806. tray_now_at_start=5, # AMS1-T1
  807. )
  808. assert len(results) == 1
  809. assert results[0]["ams_id"] == 1
  810. assert results[0]["tray_id"] == 1
  811. class TestTrayChangeSplit:
  812. """Tests for mid-print tray switch weight splitting in _track_from_3mf()."""
  813. @pytest.mark.asyncio
  814. async def test_tray_switch_splits_weight_with_gcode(self):
  815. """Two-tray runout: weight split using per-layer gcode data."""
  816. spool_a = _make_spool(spool_id=10, label_weight=1000)
  817. spool_b = _make_spool(spool_id=20, label_weight=1000)
  818. assign_a = _make_assignment(spool_id=10, ams_id=0, tray_id=1)
  819. assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=0)
  820. archive = _make_archive(archive_id=100)
  821. # db: archive, queue_item(None), then for each segment: assignment, spool
  822. db = _mock_db_sequential([archive, None, assign_a, spool_a, assign_b, spool_b])
  823. # Tray change log: started on tray 1, switched to tray 0 at layer 60
  824. printer_manager = MagicMock()
  825. printer_manager.get_status.return_value = SimpleNamespace(
  826. progress=100,
  827. layer_num=100,
  828. tray_now=0,
  829. last_loaded_tray=0,
  830. total_layers=100,
  831. tray_change_log=[(1, 0), (0, 60)],
  832. )
  833. filament_usage = [{"slot_id": 1, "used_g": 30.0, "type": "PLA", "color": ""}]
  834. handled_trays: set[tuple[int, int]] = set()
  835. with (
  836. patch("backend.app.core.config.settings") as mock_settings,
  837. patch(
  838. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  839. return_value=filament_usage,
  840. ),
  841. patch(
  842. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  843. return_value={30: {0: 3000.0}, 60: {0: 6000.0}, 100: {0: 10000.0}},
  844. ),
  845. patch(
  846. "backend.app.utils.threemf_tools.get_cumulative_usage_at_layer",
  847. side_effect=lambda data, layer: {0: {0: 0.0, 60: 6000.0, 100: 10000.0}.get(layer, 0.0)},
  848. ),
  849. patch(
  850. "backend.app.utils.threemf_tools.extract_filament_properties_from_3mf",
  851. return_value={1: {"density": 1.24, "diameter": 1.75}},
  852. ),
  853. patch(
  854. "backend.app.utils.threemf_tools.mm_to_grams",
  855. side_effect=lambda mm, d, dens: round(mm * 0.003, 1), # Simple conversion
  856. ),
  857. ):
  858. mock_settings.base_dir = MagicMock()
  859. mock_path = MagicMock()
  860. mock_path.exists.return_value = True
  861. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  862. results = await _track_from_3mf(
  863. printer_id=1,
  864. archive_id=100,
  865. status="completed",
  866. print_name="Runout Test",
  867. handled_trays=handled_trays,
  868. printer_manager=printer_manager,
  869. db=db,
  870. )
  871. # Two results: one per tray segment
  872. assert len(results) == 2
  873. # First segment: tray 1 (AMS0-T1), layers 0→60
  874. assert results[0]["ams_id"] == 0
  875. assert results[0]["tray_id"] == 1
  876. assert results[0]["spool_id"] == 10
  877. assert results[0]["weight_used"] == 18.0 # 6000mm * 0.003
  878. # Second segment: tray 0 (AMS0-T0), layers 60→end = 30.0 - 18.0 = 12.0
  879. assert results[1]["ams_id"] == 0
  880. assert results[1]["tray_id"] == 0
  881. assert results[1]["spool_id"] == 20
  882. assert results[1]["weight_used"] == 12.0
  883. # Both trays handled
  884. assert (0, 1) in handled_trays
  885. assert (0, 0) in handled_trays
  886. @pytest.mark.asyncio
  887. async def test_tray_switch_linear_fallback(self):
  888. """Two-tray runout without per-layer gcode: linear split by layer ratio."""
  889. spool_a = _make_spool(spool_id=10, label_weight=1000)
  890. spool_b = _make_spool(spool_id=20, label_weight=1000)
  891. assign_a = _make_assignment(spool_id=10, ams_id=0, tray_id=2)
  892. assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=1)
  893. archive = _make_archive(archive_id=101)
  894. db = _mock_db_sequential([archive, None, assign_a, spool_a, assign_b, spool_b])
  895. # Tray 2 from layer 0, switched to tray 1 at layer 40 (of 100 total)
  896. printer_manager = MagicMock()
  897. printer_manager.get_status.return_value = SimpleNamespace(
  898. progress=100,
  899. layer_num=100,
  900. tray_now=1,
  901. last_loaded_tray=1,
  902. total_layers=100,
  903. tray_change_log=[(2, 0), (1, 40)],
  904. )
  905. filament_usage = [{"slot_id": 1, "used_g": 50.0, "type": "PLA", "color": ""}]
  906. handled_trays: set[tuple[int, int]] = set()
  907. with (
  908. patch("backend.app.core.config.settings") as mock_settings,
  909. patch(
  910. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  911. return_value=filament_usage,
  912. ),
  913. patch(
  914. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  915. return_value=None, # No per-layer gcode available
  916. ),
  917. ):
  918. mock_settings.base_dir = MagicMock()
  919. mock_path = MagicMock()
  920. mock_path.exists.return_value = True
  921. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  922. results = await _track_from_3mf(
  923. printer_id=1,
  924. archive_id=101,
  925. status="completed",
  926. print_name="Linear Fallback",
  927. handled_trays=handled_trays,
  928. printer_manager=printer_manager,
  929. db=db,
  930. )
  931. assert len(results) == 2
  932. # Linear split: tray 2 for 40/100 layers = 20g
  933. assert results[0]["ams_id"] == 0
  934. assert results[0]["tray_id"] == 2
  935. assert results[0]["weight_used"] == 20.0
  936. # Last segment gets remainder: 50 - 20 = 30g
  937. assert results[1]["ams_id"] == 0
  938. assert results[1]["tray_id"] == 1
  939. assert results[1]["weight_used"] == 30.0
  940. @pytest.mark.asyncio
  941. async def test_no_tray_change_uses_normal_path(self):
  942. """Single-entry tray_change_log falls through to normal tray_now_at_start logic."""
  943. spool = _make_spool(spool_id=1, label_weight=1000)
  944. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=2)
  945. archive = _make_archive(archive_id=102)
  946. db = _mock_db_sequential([archive, None, assignment, spool])
  947. # Only one entry = no switch, should use normal path
  948. printer_manager = MagicMock()
  949. printer_manager.get_status.return_value = SimpleNamespace(
  950. progress=100,
  951. layer_num=100,
  952. tray_now=2,
  953. last_loaded_tray=2,
  954. total_layers=100,
  955. tray_change_log=[(2, 0)],
  956. )
  957. filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": ""}]
  958. handled_trays: set[tuple[int, int]] = set()
  959. with (
  960. patch("backend.app.core.config.settings") as mock_settings,
  961. patch(
  962. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  963. return_value=filament_usage,
  964. ),
  965. ):
  966. mock_settings.base_dir = MagicMock()
  967. mock_path = MagicMock()
  968. mock_path.exists.return_value = True
  969. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  970. results = await _track_from_3mf(
  971. printer_id=1,
  972. archive_id=102,
  973. status="completed",
  974. print_name="No Switch",
  975. handled_trays=handled_trays,
  976. printer_manager=printer_manager,
  977. db=db,
  978. tray_now_at_start=2,
  979. )
  980. # Normal path: single result, full weight
  981. assert len(results) == 1
  982. assert results[0]["weight_used"] == 15.0
  983. assert results[0]["ams_id"] == 0
  984. assert results[0]["tray_id"] == 2
  985. @pytest.mark.asyncio
  986. async def test_empty_tray_change_log_uses_normal_path(self):
  987. """Empty tray_change_log (e.g. server restart) falls through to existing logic."""
  988. spool = _make_spool(spool_id=1, label_weight=1000)
  989. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  990. archive = _make_archive(archive_id=103)
  991. db = _mock_db_sequential([archive, None, assignment, spool])
  992. # Empty log (server restarted mid-print)
  993. printer_manager = MagicMock()
  994. printer_manager.get_status.return_value = SimpleNamespace(
  995. progress=100,
  996. layer_num=100,
  997. tray_now=0,
  998. last_loaded_tray=0,
  999. total_layers=100,
  1000. tray_change_log=[],
  1001. )
  1002. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
  1003. handled_trays: set[tuple[int, int]] = set()
  1004. with (
  1005. patch("backend.app.core.config.settings") as mock_settings,
  1006. patch(
  1007. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1008. return_value=filament_usage,
  1009. ),
  1010. ):
  1011. mock_settings.base_dir = MagicMock()
  1012. mock_path = MagicMock()
  1013. mock_path.exists.return_value = True
  1014. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1015. results = await _track_from_3mf(
  1016. printer_id=1,
  1017. archive_id=103,
  1018. status="completed",
  1019. print_name="Restart Recovery",
  1020. handled_trays=handled_trays,
  1021. printer_manager=printer_manager,
  1022. db=db,
  1023. tray_now_at_start=0,
  1024. )
  1025. assert len(results) == 1
  1026. assert results[0]["weight_used"] == 10.0
  1027. @pytest.mark.asyncio
  1028. async def test_tray_switch_segment_no_spool(self):
  1029. """Segment with no spool assignment is skipped; other segments still tracked."""
  1030. spool_b = _make_spool(spool_id=20, label_weight=1000)
  1031. assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=3)
  1032. archive = _make_archive(archive_id=104)
  1033. # db: archive, queue_item(None), 1st segment: no assignment, 2nd segment: assignment, spool
  1034. db = _mock_db_sequential([archive, None, None, assign_b, spool_b])
  1035. # Tray 5 (no spool) from layer 0, switched to tray 3 at layer 50
  1036. printer_manager = MagicMock()
  1037. printer_manager.get_status.return_value = SimpleNamespace(
  1038. progress=100,
  1039. layer_num=100,
  1040. tray_now=3,
  1041. last_loaded_tray=3,
  1042. total_layers=100,
  1043. tray_change_log=[(5, 0), (3, 50)],
  1044. )
  1045. filament_usage = [{"slot_id": 1, "used_g": 40.0, "type": "PLA", "color": ""}]
  1046. handled_trays: set[tuple[int, int]] = set()
  1047. with (
  1048. patch("backend.app.core.config.settings") as mock_settings,
  1049. patch(
  1050. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1051. return_value=filament_usage,
  1052. ),
  1053. patch(
  1054. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  1055. return_value=None, # No per-layer data
  1056. ),
  1057. ):
  1058. mock_settings.base_dir = MagicMock()
  1059. mock_path = MagicMock()
  1060. mock_path.exists.return_value = True
  1061. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1062. results = await _track_from_3mf(
  1063. printer_id=1,
  1064. archive_id=104,
  1065. status="completed",
  1066. print_name="Missing Spool",
  1067. handled_trays=handled_trays,
  1068. printer_manager=printer_manager,
  1069. db=db,
  1070. )
  1071. # Only the second segment (tray 3) tracked; first segment (tray 5) skipped
  1072. assert len(results) == 1
  1073. assert results[0]["ams_id"] == 0
  1074. assert results[0]["tray_id"] == 3
  1075. assert results[0]["spool_id"] == 20
  1076. @pytest.mark.asyncio
  1077. async def test_tray_switch_three_segments(self):
  1078. """Three-segment switch (rare): A→B→C split by linear fallback."""
  1079. spool_a = _make_spool(spool_id=1, label_weight=1000)
  1080. spool_b = _make_spool(spool_id=2, label_weight=1000)
  1081. spool_c = _make_spool(spool_id=3, label_weight=1000)
  1082. assign_a = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  1083. assign_b = _make_assignment(spool_id=2, ams_id=0, tray_id=1)
  1084. assign_c = _make_assignment(spool_id=3, ams_id=0, tray_id=2)
  1085. archive = _make_archive(archive_id=105)
  1086. db = _mock_db_sequential(
  1087. [
  1088. archive,
  1089. None,
  1090. assign_a,
  1091. spool_a,
  1092. assign_b,
  1093. spool_b,
  1094. assign_c,
  1095. spool_c,
  1096. ]
  1097. )
  1098. # 3 segments: tray 0 (0-30), tray 1 (30-70), tray 2 (70-end)
  1099. printer_manager = MagicMock()
  1100. printer_manager.get_status.return_value = SimpleNamespace(
  1101. progress=100,
  1102. layer_num=100,
  1103. tray_now=2,
  1104. last_loaded_tray=2,
  1105. total_layers=100,
  1106. tray_change_log=[(0, 0), (1, 30), (2, 70)],
  1107. )
  1108. filament_usage = [{"slot_id": 1, "used_g": 100.0, "type": "PLA", "color": ""}]
  1109. handled_trays: set[tuple[int, int]] = set()
  1110. with (
  1111. patch("backend.app.core.config.settings") as mock_settings,
  1112. patch(
  1113. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1114. return_value=filament_usage,
  1115. ),
  1116. patch(
  1117. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  1118. return_value=None,
  1119. ),
  1120. ):
  1121. mock_settings.base_dir = MagicMock()
  1122. mock_path = MagicMock()
  1123. mock_path.exists.return_value = True
  1124. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1125. results = await _track_from_3mf(
  1126. printer_id=1,
  1127. archive_id=105,
  1128. status="completed",
  1129. print_name="Triple Switch",
  1130. handled_trays=handled_trays,
  1131. printer_manager=printer_manager,
  1132. db=db,
  1133. )
  1134. assert len(results) == 3
  1135. # Tray 0: 30/100 * 100g = 30g
  1136. assert results[0]["weight_used"] == 30.0
  1137. assert results[0]["ams_id"] == 0
  1138. assert results[0]["tray_id"] == 0
  1139. # Tray 1: 40/100 * 100g = 40g
  1140. assert results[1]["weight_used"] == 40.0
  1141. assert results[1]["ams_id"] == 0
  1142. assert results[1]["tray_id"] == 1
  1143. # Tray 2: remainder = 100 - 30 - 40 = 30g
  1144. assert results[2]["weight_used"] == 30.0
  1145. assert results[2]["ams_id"] == 0
  1146. assert results[2]["tray_id"] == 2
  1147. class TestDecodeMqttMapping:
  1148. """Tests for _decode_mqtt_mapping() — snow-encoded MQTT mapping to global tray IDs."""
  1149. def test_none_input(self):
  1150. assert _decode_mqtt_mapping(None) is None
  1151. def test_empty_list(self):
  1152. assert _decode_mqtt_mapping([]) is None
  1153. def test_all_unmapped(self):
  1154. """All 65535 values → None (no valid mappings)."""
  1155. assert _decode_mqtt_mapping([65535, 65535, 65535]) is None
  1156. def test_single_ams_slots(self):
  1157. """AMS 0 slots: snow values 0-3 → global tray IDs 0-3."""
  1158. assert _decode_mqtt_mapping([0, 1, 2, 3]) == [0, 1, 2, 3]
  1159. def test_multi_ams_slots(self):
  1160. """AMS 1 (hw_id=1): snow 256=AMS1-T0, 257=AMS1-T1 → global 4, 5."""
  1161. assert _decode_mqtt_mapping([256, 257]) == [4, 5]
  1162. def test_ams_ht_slot(self):
  1163. """AMS-HT (hw_id=128): snow 32768 → global 128."""
  1164. assert _decode_mqtt_mapping([32768]) == [128]
  1165. def test_external_spool(self):
  1166. """External spool: ams_hw_id=254, slot=0 → global 254."""
  1167. # snow = 254 * 256 + 0 = 65024
  1168. assert _decode_mqtt_mapping([65024]) == [254]
  1169. def test_mixed_with_unmapped(self):
  1170. """Mix of valid and unmapped (65535) values."""
  1171. result = _decode_mqtt_mapping([1, 65535, 0])
  1172. assert result == [1, -1, 0]
  1173. def test_h2c_real_mapping(self):
  1174. """Real H2C mapping from MQTT logs: [1, 0, 65535*4, 32768]."""
  1175. mapping = [1, 0, 65535, 65535, 65535, 65535, 32768]
  1176. result = _decode_mqtt_mapping(mapping)
  1177. assert result == [1, 0, -1, -1, -1, -1, 128]
  1178. def test_non_int_values_treated_as_unmapped(self):
  1179. """Non-integer values in the mapping are treated as unmapped."""
  1180. assert _decode_mqtt_mapping(["foo", 0]) == [-1, 0]
  1181. class TestMatchSlotsByColor:
  1182. """Tests for _match_slots_by_color() — color-based filament slot to AMS tray matching."""
  1183. def _ams(self, trays):
  1184. """Build AMS data from list of (ams_id, tray_id, color_hex, tray_type) tuples."""
  1185. units: dict[int, list] = {}
  1186. for ams_id, tray_id, color, tray_type in trays:
  1187. units.setdefault(ams_id, []).append({"id": tray_id, "tray_color": color, "tray_type": tray_type})
  1188. return [{"id": aid, "tray": t} for aid, t in units.items()]
  1189. def _usage(self, slots):
  1190. """Build filament_usage from list of (slot_id, color_hex) tuples."""
  1191. return [{"slot_id": sid, "used_g": 10.0, "type": "PLA", "color": color} for sid, color in slots]
  1192. def test_none_inputs(self):
  1193. assert _match_slots_by_color(None, None) is None
  1194. assert _match_slots_by_color([], None) is None
  1195. assert _match_slots_by_color(None, {"ams": []}) is None
  1196. def test_empty_ams(self):
  1197. usage = self._usage([(1, "#FF0000")])
  1198. assert _match_slots_by_color(usage, {"ams": []}) is None
  1199. def test_single_slot_single_tray(self):
  1200. """One 3MF slot matches one AMS tray by color."""
  1201. ams = self._ams([(0, 0, "FF0000FF", "PLA")])
  1202. usage = self._usage([(1, "#FF0000")])
  1203. assert _match_slots_by_color(usage, {"ams": ams}) == [0]
  1204. def test_a1_mini_three_colors(self):
  1205. """A1 Mini: 3 slots match 3 distinct AMS trays."""
  1206. ams = self._ams(
  1207. [
  1208. (0, 0, "FF0000FF", "PLA"), # Red
  1209. (0, 1, "00FF00FF", "PLA"), # Green
  1210. (0, 2, "0000FFFF", "PLA"), # Blue
  1211. ]
  1212. )
  1213. usage = self._usage([(1, "#FF0000"), (2, "#00FF00"), (3, "#0000FF")])
  1214. assert _match_slots_by_color(usage, {"ams": ams}) == [0, 1, 2]
  1215. def test_dual_ams_p2s_like(self):
  1216. """P2S with dual AMS: slots from second AMS unit."""
  1217. ams = self._ams(
  1218. [
  1219. (0, 0, "AAAAAAFF", "PLA"),
  1220. (0, 1, "BBBBBBFF", "PLA"),
  1221. (1, 0, "CC0000FF", "PETG"), # global_id=4
  1222. (1, 1, "00CC00FF", "PETG"), # global_id=5
  1223. ]
  1224. )
  1225. usage = self._usage([(1, "#CC0000"), (2, "#00CC00")])
  1226. assert _match_slots_by_color(usage, {"ams": ams}) == [4, 5]
  1227. def test_ams_ht_global_id(self):
  1228. """AMS-HT (ams_id >= 128) uses raw ams_id as global tray ID."""
  1229. ams = self._ams(
  1230. [
  1231. (0, 0, "FF0000FF", "PLA"),
  1232. (128, 0, "0000FFFF", "PLA"), # AMS-HT → global_id=128
  1233. ]
  1234. )
  1235. usage = self._usage([(1, "#FF0000"), (2, "#0000FF")])
  1236. assert _match_slots_by_color(usage, {"ams": ams}) == [0, 128]
  1237. def test_ambiguous_same_color_returns_none(self):
  1238. """Two trays with the same color → ambiguous → None."""
  1239. ams = self._ams(
  1240. [
  1241. (0, 0, "FF0000FF", "PLA"),
  1242. (0, 1, "FF0000FF", "PLA"), # Same red
  1243. ]
  1244. )
  1245. usage = self._usage([(1, "#FF0000")])
  1246. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1247. def test_no_matching_color_returns_none(self):
  1248. """3MF slot color not found in any AMS tray → None."""
  1249. ams = self._ams([(0, 0, "00FF00FF", "PLA")])
  1250. usage = self._usage([(1, "#FF0000")]) # Red, but AMS has green
  1251. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1252. def test_color_normalization_strips_alpha(self):
  1253. """AMS colors (RRGGBBAA) and 3MF colors (#RRGGBB) match after normalization."""
  1254. ams = self._ams([(0, 0, "AABBCC80", "PLA")]) # 8-char with alpha
  1255. usage = self._usage([(1, "#AABBCC")]) # 6-char with #
  1256. assert _match_slots_by_color(usage, {"ams": ams}) == [0]
  1257. def test_case_insensitive(self):
  1258. """Color matching is case-insensitive."""
  1259. ams = self._ams([(0, 0, "aaBBccFF", "PLA")])
  1260. usage = self._usage([(1, "#AAbbCC")])
  1261. assert _match_slots_by_color(usage, {"ams": ams}) == [0]
  1262. def test_empty_tray_color_skipped(self):
  1263. """Trays with empty color are skipped (not matched)."""
  1264. ams = self._ams(
  1265. [
  1266. (0, 0, "", "PLA"),
  1267. (0, 1, "FF0000FF", "PLA"),
  1268. ]
  1269. )
  1270. usage = self._usage([(1, "#FF0000")])
  1271. assert _match_slots_by_color(usage, {"ams": ams}) == [1]
  1272. def test_empty_tray_type_skipped(self):
  1273. """Trays with empty tray_type are skipped (unloaded slot)."""
  1274. ams = self._ams(
  1275. [
  1276. (0, 0, "FF0000FF", ""), # Empty slot
  1277. (0, 1, "FF0000FF", "PLA"), # Loaded slot
  1278. ]
  1279. )
  1280. usage = self._usage([(1, "#FF0000")])
  1281. assert _match_slots_by_color(usage, {"ams": ams}) == [1]
  1282. def test_short_slot_color_returns_none(self):
  1283. """3MF slot with color < 6 chars → can't match → None."""
  1284. ams = self._ams([(0, 0, "FF0000FF", "PLA")])
  1285. usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FFF"}]
  1286. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1287. def test_slot_id_zero_skipped(self):
  1288. """Slots with slot_id=0 are skipped."""
  1289. ams = self._ams([(0, 0, "FF0000FF", "PLA")])
  1290. usage = [{"slot_id": 0, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  1291. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1292. def test_ams_data_as_list(self):
  1293. """Handles ams_raw as a plain list (some printer models)."""
  1294. ams_list = [{"id": 0, "tray": [{"id": 0, "tray_color": "FF0000FF", "tray_type": "PLA"}]}]
  1295. usage = self._usage([(1, "#FF0000")])
  1296. assert _match_slots_by_color(usage, ams_list) == [0]
  1297. def test_same_color_two_trays_disambiguated_by_usage(self):
  1298. """Two trays same color, two slots same color → unique assignment via used_trays tracking."""
  1299. ams = self._ams(
  1300. [
  1301. (0, 0, "FF0000FF", "PLA"),
  1302. (0, 1, "FF0000FF", "PLA"),
  1303. ]
  1304. )
  1305. # Two slots both wanting red — first gets tray 0, second gets tray 1? No.
  1306. # When first slot takes the only available, second has 1 left → should work
  1307. usage = self._usage([(1, "#FF0000"), (2, "#FF0000")])
  1308. # First slot: candidates=[0,1], available=[0,1], len!=1 → None
  1309. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1310. def test_dict_wrapper_with_ams_key(self):
  1311. """Standard dict format with 'ams' key."""
  1312. ams_data = {"ams": [{"id": 0, "tray": [{"id": 0, "tray_color": "00FF00FF", "tray_type": "PLA"}]}]}
  1313. usage = self._usage([(1, "#00FF00")])
  1314. assert _match_slots_by_color(usage, ams_data) == [0]
  1315. class TestMqttMappingIntegration:
  1316. """Integration tests: MQTT mapping field used in _track_from_3mf."""
  1317. @pytest.mark.asyncio
  1318. async def test_h2c_multi_filament_uses_mqtt_mapping(self):
  1319. """H2C: 3 filaments resolved via MQTT mapping field (no ams_mapping, no queue)."""
  1320. # AMS0-T1 (White PLA), AMS0-T0 (Black PLA), AMS128-T0 (Red PLA)
  1321. spool_white = _make_spool(spool_id=1, label_weight=1000)
  1322. spool_black = _make_spool(spool_id=2, label_weight=1000)
  1323. spool_red = _make_spool(spool_id=3, label_weight=1000)
  1324. assign_white = _make_assignment(spool_id=1, ams_id=0, tray_id=1)
  1325. assign_black = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  1326. assign_red = _make_assignment(spool_id=3, ams_id=128, tray_id=0)
  1327. archive = _make_archive(archive_id=12)
  1328. # db: archive, then 3 pairs of (assignment, spool)
  1329. # No queue lookup because MQTT mapping is found first
  1330. db = _mock_db_sequential(
  1331. [
  1332. archive,
  1333. assign_white,
  1334. spool_white,
  1335. assign_black,
  1336. spool_black,
  1337. assign_red,
  1338. spool_red,
  1339. ]
  1340. )
  1341. # MQTT mapping: slot0→AMS0-T1(1), slot1→AMS0-T0(0), slots2-5→unmapped, slot6→AMS128-T0(32768)
  1342. printer_manager = MagicMock()
  1343. printer_manager.get_status.return_value = SimpleNamespace(
  1344. raw_data={"mapping": [1, 0, 65535, 65535, 65535, 65535, 32768]},
  1345. progress=100,
  1346. layer_num=50,
  1347. tray_now=255,
  1348. )
  1349. # 3MF slots 1, 2, 7 (1-based) → indices 0, 1, 6 in mapping
  1350. filament_usage = [
  1351. {"slot_id": 1, "used_g": 21.16, "type": "PLA", "color": "#FFFFFF"},
  1352. {"slot_id": 2, "used_g": 24.22, "type": "PLA", "color": "#000000"},
  1353. {"slot_id": 7, "used_g": 18.47, "type": "PLA", "color": "#F72323"},
  1354. ]
  1355. handled_trays: set[tuple[int, int]] = set()
  1356. with (
  1357. patch("backend.app.core.config.settings") as mock_settings,
  1358. patch(
  1359. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1360. return_value=filament_usage,
  1361. ),
  1362. ):
  1363. mock_settings.base_dir = MagicMock()
  1364. mock_path = MagicMock()
  1365. mock_path.exists.return_value = True
  1366. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1367. results = await _track_from_3mf(
  1368. printer_id=1,
  1369. archive_id=12,
  1370. status="completed",
  1371. print_name="Cube + Cube + Cube",
  1372. handled_trays=handled_trays,
  1373. printer_manager=printer_manager,
  1374. db=db,
  1375. )
  1376. assert len(results) == 3
  1377. # slot_id=1 → mapping[0]=1 → AMS0-T1 (White PLA)
  1378. assert results[0]["spool_id"] == 1
  1379. assert results[0]["ams_id"] == 0
  1380. assert results[0]["tray_id"] == 1
  1381. assert results[0]["weight_used"] == 21.2
  1382. # slot_id=2 → mapping[1]=0 → AMS0-T0 (Black PLA)
  1383. assert results[1]["spool_id"] == 2
  1384. assert results[1]["ams_id"] == 0
  1385. assert results[1]["tray_id"] == 0
  1386. assert results[1]["weight_used"] == 24.2
  1387. # slot_id=7 → mapping[6]=32768 → AMS128-T0 (Red PLA)
  1388. assert results[2]["spool_id"] == 3
  1389. assert results[2]["ams_id"] == 128
  1390. assert results[2]["tray_id"] == 0
  1391. assert results[2]["weight_used"] == 18.5
  1392. @pytest.mark.asyncio
  1393. async def test_print_cmd_mapping_takes_priority_over_mqtt(self):
  1394. """ams_mapping from print command is used even when MQTT mapping exists."""
  1395. spool = _make_spool(spool_id=1, label_weight=1000)
  1396. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=2)
  1397. archive = _make_archive(archive_id=10)
  1398. # db: archive, assignment, spool (no queue lookup when ams_mapping provided)
  1399. db = _mock_db_sequential([archive, assignment, spool])
  1400. printer_manager = MagicMock()
  1401. printer_manager.get_status.return_value = SimpleNamespace(
  1402. raw_data={"mapping": [0, 65535]}, # MQTT says slot 0 → AMS0-T0
  1403. progress=100,
  1404. layer_num=50,
  1405. tray_now=255,
  1406. )
  1407. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
  1408. handled_trays: set[tuple[int, int]] = set()
  1409. with (
  1410. patch("backend.app.core.config.settings") as mock_settings,
  1411. patch(
  1412. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1413. return_value=filament_usage,
  1414. ),
  1415. ):
  1416. mock_settings.base_dir = MagicMock()
  1417. mock_path = MagicMock()
  1418. mock_path.exists.return_value = True
  1419. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1420. results = await _track_from_3mf(
  1421. printer_id=1,
  1422. archive_id=10,
  1423. status="completed",
  1424. print_name="Test",
  1425. handled_trays=handled_trays,
  1426. printer_manager=printer_manager,
  1427. db=db,
  1428. ams_mapping=[2], # Print cmd says slot 0 → AMS0-T2 (overrides MQTT)
  1429. )
  1430. assert len(results) == 1
  1431. assert results[0]["ams_id"] == 0
  1432. assert results[0]["tray_id"] == 2 # From print_cmd mapping, not MQTT
  1433. class TestNotificationVariables:
  1434. """Tests for filament_details formatting in notifications."""
  1435. def test_filament_details_single_slot(self):
  1436. """Single slot produces 'PLA: 15.2g' format."""
  1437. slots = [{"type": "PLA", "used_g": 15.2, "slot_id": 1, "color": "#FF0000"}]
  1438. parts = []
  1439. for slot in slots:
  1440. ftype = slot.get("type", "Unknown") or "Unknown"
  1441. used = slot.get("used_g", 0)
  1442. parts.append(f"{ftype}: {used:.1f}g")
  1443. result = " | ".join(parts)
  1444. assert result == "PLA: 15.2g"
  1445. def test_filament_details_multi_slot(self):
  1446. """Multiple slots produce 'PLA: 10.0g | PETG: 5.0g' format."""
  1447. slots = [
  1448. {"type": "PLA", "used_g": 10.0, "slot_id": 1, "color": ""},
  1449. {"type": "PETG", "used_g": 5.0, "slot_id": 2, "color": ""},
  1450. ]
  1451. parts = []
  1452. for slot in slots:
  1453. ftype = slot.get("type", "Unknown") or "Unknown"
  1454. used = slot.get("used_g", 0)
  1455. parts.append(f"{ftype}: {used:.1f}g")
  1456. result = " | ".join(parts)
  1457. assert result == "PLA: 10.0g | PETG: 5.0g"
  1458. def test_filament_details_empty_type(self):
  1459. """Empty type defaults to 'Unknown'."""
  1460. slots = [{"type": "", "used_g": 5.0, "slot_id": 1, "color": ""}]
  1461. parts = []
  1462. for slot in slots:
  1463. ftype = slot.get("type", "Unknown") or "Unknown"
  1464. used = slot.get("used_g", 0)
  1465. parts.append(f"{ftype}: {used:.1f}g")
  1466. result = " | ".join(parts)
  1467. assert result == "Unknown: 5.0g"
  1468. def test_filament_grams_scaled_for_partial(self):
  1469. """filament_grams is scaled by progress for partial prints."""
  1470. filament_used_grams = 20.0
  1471. progress = 50
  1472. scale = max(0.0, min(progress / 100.0, 1.0))
  1473. scaled = round(filament_used_grams * scale, 1)
  1474. assert scaled == 10.0
  1475. def test_filament_grams_zero_progress(self):
  1476. """Progress=0 at cancellation gives 0.0g."""
  1477. filament_used_grams = 20.0
  1478. progress = 0
  1479. scale = max(0.0, min(progress / 100.0, 1.0))
  1480. scaled = round(filament_used_grams * scale, 1)
  1481. assert scaled == 0.0
  1482. def test_slot_scaling_for_partial(self):
  1483. """Per-slot usage is scaled linearly for partial prints."""
  1484. slots = [
  1485. {"type": "PLA", "used_g": 20.0, "slot_id": 1, "color": ""},
  1486. {"type": "PETG", "used_g": 10.0, "slot_id": 2, "color": ""},
  1487. ]
  1488. progress = 30
  1489. scale = max(0.0, min(progress / 100.0, 1.0))
  1490. scaled_slots = [{**s, "used_g": round(s["used_g"] * scale, 1)} for s in slots]
  1491. assert scaled_slots[0]["used_g"] == 6.0
  1492. assert scaled_slots[1]["used_g"] == 3.0
  1493. class TestOnPrintStartAmsMapping:
  1494. """Tests for ams_mapping capture in on_print_start()."""
  1495. @pytest.fixture(autouse=True)
  1496. def _clear_sessions(self):
  1497. _active_sessions.clear()
  1498. yield
  1499. _active_sessions.clear()
  1500. @pytest.mark.asyncio
  1501. async def test_captures_ams_mapping_from_data(self):
  1502. """on_print_start captures ams_mapping from the data dict into the session."""
  1503. printer_manager = MagicMock()
  1504. printer_manager.get_status.return_value = SimpleNamespace(
  1505. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
  1506. tray_now=0,
  1507. )
  1508. await on_print_start(1, {"subtask_name": "Test", "ams_mapping": [3, -1, -1, 2]}, printer_manager)
  1509. assert _active_sessions[1].ams_mapping == [3, -1, -1, 2]
  1510. @pytest.mark.asyncio
  1511. async def test_ams_mapping_none_when_not_in_data(self):
  1512. """Session ams_mapping is None when data dict has no ams_mapping."""
  1513. printer_manager = MagicMock()
  1514. printer_manager.get_status.return_value = SimpleNamespace(
  1515. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
  1516. tray_now=0,
  1517. )
  1518. await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
  1519. assert _active_sessions[1].ams_mapping is None
  1520. class TestFindThreemfByFilename:
  1521. """Tests for _find_3mf_by_filename() — library/archive search without archive_id."""
  1522. @pytest.mark.asyncio
  1523. async def test_finds_library_file(self):
  1524. """Finds a 3MF from library files matching filename."""
  1525. from pathlib import Path
  1526. from unittest.mock import MagicMock
  1527. lib_file = MagicMock()
  1528. lib_file.file_path = "library/BMCU-BADGE.3mf"
  1529. mock_result = MagicMock()
  1530. mock_result.scalars.return_value.all.return_value = [lib_file]
  1531. db = AsyncMock()
  1532. db.execute = AsyncMock(return_value=mock_result)
  1533. base_dir = MagicMock(spec=Path)
  1534. candidate = MagicMock(spec=Path)
  1535. candidate.exists.return_value = True
  1536. candidate.suffix = ".3mf"
  1537. base_dir.__truediv__ = MagicMock(return_value=candidate)
  1538. result = await _find_3mf_by_filename(1, "BMCU-BADGE.3mf", db, base_dir)
  1539. assert result == candidate
  1540. @pytest.mark.asyncio
  1541. async def test_returns_none_for_empty_filename(self):
  1542. """Returns None when filename is empty or just extensions."""
  1543. db = AsyncMock()
  1544. base_dir = MagicMock()
  1545. result = await _find_3mf_by_filename(1, ".3mf", db, base_dir)
  1546. assert result is None
  1547. result = await _find_3mf_by_filename(1, "", db, base_dir)
  1548. assert result is None
  1549. @pytest.mark.asyncio
  1550. async def test_falls_through_to_archive_search(self):
  1551. """Falls back to previous archives when library search returns no results."""
  1552. from pathlib import Path
  1553. # Library returns nothing
  1554. empty_result = MagicMock()
  1555. empty_result.scalars.return_value.all.return_value = []
  1556. # Archive returns a match
  1557. archive = MagicMock()
  1558. archive.id = 35
  1559. archive.file_path = "archives/35/BMCU-BADGE.3mf"
  1560. archive_result = MagicMock()
  1561. archive_result.scalars.return_value.all.return_value = [archive]
  1562. db = AsyncMock()
  1563. db.execute = AsyncMock(side_effect=[empty_result, archive_result])
  1564. base_dir = MagicMock(spec=Path)
  1565. candidate = MagicMock(spec=Path)
  1566. candidate.exists.return_value = True
  1567. candidate.suffix = ".3mf"
  1568. base_dir.__truediv__ = MagicMock(return_value=candidate)
  1569. result = await _find_3mf_by_filename(1, "BMCU-BADGE.3mf", db, base_dir)
  1570. assert result == candidate
  1571. assert db.execute.call_count == 2
  1572. @pytest.mark.asyncio
  1573. async def test_returns_none_when_nothing_found(self):
  1574. """Returns None when neither library nor archives have a matching 3MF."""
  1575. empty_result = MagicMock()
  1576. empty_result.scalars.return_value.all.return_value = []
  1577. db = AsyncMock()
  1578. db.execute = AsyncMock(return_value=empty_result)
  1579. base_dir = MagicMock()
  1580. result = await _find_3mf_by_filename(1, "nonexistent.3mf", db, base_dir)
  1581. assert result is None
  1582. @pytest.mark.asyncio
  1583. async def test_strips_path_and_extensions(self):
  1584. """Correctly strips path components and extensions for search."""
  1585. empty_result = MagicMock()
  1586. empty_result.scalars.return_value.all.return_value = []
  1587. db = AsyncMock()
  1588. db.execute = AsyncMock(return_value=empty_result)
  1589. base_dir = MagicMock()
  1590. # Should search for "BMCU-BADGE" base name even with path and .gcode.3mf
  1591. await _find_3mf_by_filename(1, "/sdcard/BMCU-BADGE.gcode.3mf", db, base_dir)
  1592. # Verify the execute was called (search was attempted with stripped name)
  1593. assert db.execute.call_count == 2 # library + archive search
  1594. class TestTrackFrom3mfWithPreresolvedPath:
  1595. """Tests for _track_from_3mf() with threemf_path (no archive needed)."""
  1596. @pytest.mark.asyncio
  1597. async def test_uses_preresolved_path_without_archive(self):
  1598. """When threemf_path is provided with archive_id=None, uses the path directly."""
  1599. spool = _make_spool(spool_id=1, label_weight=1000)
  1600. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=3)
  1601. # DB: 1st call = assignment lookup (live), 2nd = spool lookup
  1602. db = _mock_db_sequential([assignment, spool])
  1603. printer_manager = MagicMock()
  1604. printer_manager.get_status.return_value = SimpleNamespace(
  1605. raw_data={"ams": [{"id": 0, "tray": []}]},
  1606. tray_now=255,
  1607. last_loaded_tray=3,
  1608. tray_change_log=[],
  1609. )
  1610. filament_usage = [{"slot_id": 1, "used_g": 5.0, "type": "PETG", "color": "#FFFFFF"}]
  1611. with (
  1612. patch(
  1613. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1614. return_value=filament_usage,
  1615. ),
  1616. patch("backend.app.core.config.settings") as mock_settings,
  1617. ):
  1618. mock_settings.base_dir = MagicMock()
  1619. mock_path = MagicMock()
  1620. mock_path.exists.return_value = True
  1621. results = await _track_from_3mf(
  1622. printer_id=1,
  1623. archive_id=None,
  1624. status="completed",
  1625. print_name="BMCU-BADGE",
  1626. handled_trays=set(),
  1627. printer_manager=printer_manager,
  1628. db=db,
  1629. ams_mapping=[3, -1, -1, -1],
  1630. threemf_path=mock_path,
  1631. )
  1632. assert len(results) == 1
  1633. assert results[0]["spool_id"] == 1
  1634. assert results[0]["weight_used"] == 5.0
  1635. @pytest.mark.asyncio
  1636. async def test_skips_queue_lookup_without_archive_id(self):
  1637. """When archive_id is None, queue item lookup is skipped."""
  1638. spool = _make_spool(spool_id=1, label_weight=1000)
  1639. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  1640. db = _mock_db_sequential([assignment, spool])
  1641. printer_manager = MagicMock()
  1642. printer_manager.get_status.return_value = SimpleNamespace(
  1643. raw_data={"ams": [{"id": 0, "tray": []}]},
  1644. tray_now=0,
  1645. last_loaded_tray=0,
  1646. tray_change_log=[],
  1647. )
  1648. filament_usage = [{"slot_id": 1, "used_g": 2.0, "type": "PLA", "color": "#FF0000"}]
  1649. with (
  1650. patch(
  1651. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1652. return_value=filament_usage,
  1653. ),
  1654. patch("backend.app.core.config.settings") as mock_settings,
  1655. ):
  1656. mock_settings.base_dir = MagicMock()
  1657. mock_path = MagicMock()
  1658. mock_path.exists.return_value = True
  1659. # Should NOT fail even though there's no archive_id for queue lookup
  1660. results = await _track_from_3mf(
  1661. printer_id=1,
  1662. archive_id=None,
  1663. status="completed",
  1664. print_name="Test",
  1665. handled_trays=set(),
  1666. printer_manager=printer_manager,
  1667. db=db,
  1668. tray_now_at_start=0,
  1669. threemf_path=mock_path,
  1670. )
  1671. assert len(results) == 1
  1672. assert results[0]["weight_used"] == 2.0