test_usage_tracker.py 78 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055
  1. """Unit tests for usage_tracker.py — 3MF-primary filament tracking.
  2. Tests the unified tracking logic: 3MF slicer estimates as primary path,
  3. AMS remain% delta as fallback, per-layer gcode for partial prints,
  4. slot-to-tray mapping resolution, and notification variable formatting.
  5. """
  6. from datetime import datetime, timedelta, timezone
  7. from types import SimpleNamespace
  8. from unittest.mock import AsyncMock, MagicMock, patch
  9. import pytest
  10. from backend.app.services.usage_tracker import (
  11. PrintSession,
  12. _active_sessions,
  13. _decode_mqtt_mapping,
  14. _find_3mf_by_filename,
  15. _match_slots_by_color,
  16. _track_from_3mf,
  17. on_print_complete,
  18. on_print_start,
  19. )
  20. def _make_spool(spool_id=1, label_weight=1000, weight_used=0, tag_uid=None, tray_uuid=None):
  21. """Create a mock Spool object."""
  22. spool = MagicMock()
  23. spool.id = spool_id
  24. spool.label_weight = label_weight
  25. spool.weight_used = weight_used
  26. spool.tag_uid = tag_uid
  27. spool.tray_uuid = tray_uuid
  28. spool.last_used = None
  29. spool.cost_per_kg = None
  30. spool.material = "PLA"
  31. return spool
  32. def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
  33. """Create a mock SpoolAssignment object."""
  34. assignment = MagicMock()
  35. assignment.spool_id = spool_id
  36. assignment.printer_id = printer_id
  37. assignment.ams_id = ams_id
  38. assignment.tray_id = tray_id
  39. return assignment
  40. def _make_archive(archive_id=1, file_path="archives/1/test.3mf", extra_data=None):
  41. """Create a mock PrintArchive object."""
  42. archive = MagicMock()
  43. archive.id = archive_id
  44. archive.file_path = file_path
  45. archive.extra_data = extra_data
  46. return archive
  47. def _make_queue_item(ams_mapping=None, status="printing"):
  48. """Create a mock PrintQueueItem object."""
  49. item = MagicMock()
  50. item.ams_mapping = ams_mapping
  51. item.status = status
  52. return item
  53. def _mock_db_execute(*return_values):
  54. """Create a mock db with execute() that returns values in sequence."""
  55. db = AsyncMock()
  56. results = []
  57. for val in return_values:
  58. result = MagicMock()
  59. result.scalar_one_or_none.return_value = val
  60. results.append(result)
  61. db.execute = AsyncMock(side_effect=results)
  62. return db
  63. def _mock_db_sequential(responses):
  64. """Create mock db that returns responses in order."""
  65. db = AsyncMock()
  66. call_count = [0]
  67. async def mock_execute(*args, **kwargs):
  68. idx = call_count[0]
  69. call_count[0] += 1
  70. result = MagicMock()
  71. if idx < len(responses):
  72. result.scalar_one_or_none.return_value = responses[idx]
  73. else:
  74. result.scalar_one_or_none.return_value = None
  75. # For cost aggregation queries that use .scalar() instead of .scalar_one_or_none()
  76. result.scalar.return_value = None
  77. return result
  78. db.execute = mock_execute
  79. return db
  80. class TestOnPrintStart:
  81. """Tests for on_print_start()."""
  82. @pytest.fixture(autouse=True)
  83. def _clear_sessions(self):
  84. _active_sessions.clear()
  85. yield
  86. _active_sessions.clear()
  87. @pytest.mark.asyncio
  88. async def test_captures_remain_data(self):
  89. """Captures AMS remain% at print start."""
  90. printer_manager = MagicMock()
  91. printer_manager.get_status.return_value = SimpleNamespace(
  92. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}, {"id": 1, "remain": 50}]}]},
  93. tray_now=5,
  94. )
  95. await on_print_start(1, {"subtask_name": "Benchy"}, printer_manager)
  96. assert 1 in _active_sessions
  97. session = _active_sessions[1]
  98. assert session.print_name == "Benchy"
  99. assert session.tray_remain_start == {(0, 0): 80, (0, 1): 50}
  100. @pytest.mark.asyncio
  101. async def test_captures_tray_now_at_start(self):
  102. """Captures tray_now at print start for later use in usage tracking."""
  103. printer_manager = MagicMock()
  104. printer_manager.get_status.return_value = SimpleNamespace(
  105. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
  106. tray_now=9,
  107. )
  108. await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
  109. assert _active_sessions[1].tray_now_at_start == 9
  110. @pytest.mark.asyncio
  111. async def test_tray_now_at_start_255_when_unloaded(self):
  112. """Captures tray_now=255 when printer has no filament loaded at start."""
  113. printer_manager = MagicMock()
  114. printer_manager.get_status.return_value = SimpleNamespace(
  115. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
  116. tray_now=255,
  117. )
  118. await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
  119. assert _active_sessions[1].tray_now_at_start == 255
  120. @pytest.mark.asyncio
  121. async def test_creates_session_without_remain(self):
  122. """Creates session even without valid remain data (for 3MF tracking)."""
  123. printer_manager = MagicMock()
  124. printer_manager.get_status.return_value = SimpleNamespace(
  125. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": -1}]}]},
  126. tray_now=255,
  127. )
  128. await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
  129. assert 1 in _active_sessions
  130. assert _active_sessions[1].tray_remain_start == {}
  131. class TestOnPrintComplete:
  132. """Tests for on_print_complete() — path ordering and interaction."""
  133. @pytest.fixture(autouse=True)
  134. def _clear_sessions(self):
  135. _active_sessions.clear()
  136. yield
  137. _active_sessions.clear()
  138. @pytest.fixture(autouse=True)
  139. def _mock_get_setting(self):
  140. with patch(
  141. "backend.app.api.routes.settings.get_setting",
  142. new_callable=AsyncMock,
  143. return_value=None,
  144. ):
  145. yield
  146. @pytest.mark.asyncio
  147. async def test_bl_spool_uses_3mf(self):
  148. """BL spool (with tag_uid) is tracked via 3MF, not just AMS delta."""
  149. spool = _make_spool(spool_id=1, tag_uid="AABB1122", label_weight=1000)
  150. assignment = _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0)
  151. archive = _make_archive(archive_id=10)
  152. # Setup: session with AMS remain data
  153. _active_sessions[1] = PrintSession(
  154. printer_id=1,
  155. print_name="Benchy",
  156. started_at=datetime.now(timezone.utc),
  157. tray_remain_start={(0, 0): 80},
  158. )
  159. # Mock printer state: tray_now=0 (AMS0-T0), single filament
  160. printer_manager = MagicMock()
  161. printer_manager.get_status.return_value = SimpleNamespace(
  162. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  163. progress=100,
  164. layer_num=50,
  165. tray_now=0,
  166. )
  167. # db returns: archive, queue_item(None), assignment, spool
  168. db = _mock_db_sequential([archive, None, assignment, spool])
  169. filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"}]
  170. with (
  171. patch("backend.app.core.config.settings") as mock_settings,
  172. patch(
  173. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  174. return_value=filament_usage,
  175. ),
  176. ):
  177. mock_settings.base_dir = MagicMock()
  178. mock_path = MagicMock()
  179. mock_path.exists.return_value = True
  180. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  181. results = await on_print_complete(
  182. printer_id=1,
  183. data={"status": "completed"},
  184. printer_manager=printer_manager,
  185. db=db,
  186. archive_id=10,
  187. )
  188. # 3MF path should handle it (BL guard removed)
  189. assert len(results) >= 1
  190. assert results[0]["spool_id"] == 1
  191. assert results[0]["weight_used"] == 15.0
  192. @pytest.mark.asyncio
  193. async def test_ams_delta_fallback_no_archive(self):
  194. """AMS delta tracks consumption when archive_id is None."""
  195. spool = _make_spool(spool_id=2, label_weight=1000)
  196. assignment = _make_assignment(spool_id=2)
  197. _active_sessions[1] = PrintSession(
  198. printer_id=1,
  199. print_name="Test",
  200. started_at=datetime.now(timezone.utc),
  201. tray_remain_start={(0, 0): 80},
  202. )
  203. printer_manager = MagicMock()
  204. printer_manager.get_status.return_value = SimpleNamespace(
  205. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  206. tray_now=0,
  207. last_loaded_tray=-1,
  208. )
  209. # Pad 2 Nones for _find_3mf_by_filename DB queries (library + archive search),
  210. # then assignment and spool for the AMS fallback path
  211. db = _mock_db_sequential([None, None, assignment, spool])
  212. results = await on_print_complete(
  213. printer_id=1,
  214. data={"status": "completed"},
  215. printer_manager=printer_manager,
  216. db=db,
  217. archive_id=None,
  218. )
  219. assert len(results) == 1
  220. assert results[0]["spool_id"] == 2
  221. # 10% of 1000g = 100g
  222. assert results[0]["weight_used"] == 100.0
  223. assert results[0]["percent_used"] == 10
  224. @pytest.mark.asyncio
  225. async def test_no_double_tracking(self):
  226. """When 3MF handles a tray, AMS delta skips it."""
  227. spool = _make_spool(spool_id=1, label_weight=1000)
  228. assignment = _make_assignment(spool_id=1)
  229. archive = _make_archive(archive_id=10)
  230. _active_sessions[1] = PrintSession(
  231. printer_id=1,
  232. print_name="Benchy",
  233. started_at=datetime.now(timezone.utc),
  234. tray_remain_start={(0, 0): 80},
  235. )
  236. # tray_now=0 matches the single filament slot
  237. printer_manager = MagicMock()
  238. printer_manager.get_status.return_value = SimpleNamespace(
  239. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  240. progress=100,
  241. layer_num=50,
  242. tray_now=0,
  243. )
  244. # db returns: archive, queue_item(None), assignment, spool
  245. db = _mock_db_sequential([archive, None, assignment, spool])
  246. filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"}]
  247. with (
  248. patch("backend.app.core.config.settings") as mock_settings,
  249. patch(
  250. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  251. return_value=filament_usage,
  252. ),
  253. ):
  254. mock_settings.base_dir = MagicMock()
  255. mock_path = MagicMock()
  256. mock_path.exists.return_value = True
  257. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  258. results = await on_print_complete(
  259. printer_id=1,
  260. data={"status": "completed"},
  261. printer_manager=printer_manager,
  262. db=db,
  263. archive_id=10,
  264. )
  265. # Only 1 result (3MF), NOT 2 (3MF + AMS delta)
  266. assert len(results) == 1
  267. assert results[0]["weight_used"] == 15.0
  268. class TestTrackFrom3mf:
  269. """Tests for _track_from_3mf() — per-layer, linear scaling, and slot mapping."""
  270. @pytest.mark.asyncio
  271. async def test_prefers_live_assignment_when_reassigned_mid_print(self):
  272. """If tray assignment changed during print, track usage on the new spool."""
  273. spool_old = _make_spool(spool_id=1, label_weight=1000)
  274. spool_new = _make_spool(spool_id=2, label_weight=1000)
  275. archive = _make_archive(archive_id=80)
  276. live_assignment = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  277. started_at = datetime.now(timezone.utc)
  278. live_assignment.created_at = started_at + timedelta(seconds=5)
  279. # db: archive, queue_item(None), live assignment lookup, spool_new lookup
  280. db = _mock_db_sequential([archive, None, live_assignment, spool_new])
  281. printer_manager = MagicMock()
  282. printer_manager.get_status.return_value = SimpleNamespace(
  283. progress=100,
  284. layer_num=50,
  285. tray_now=0,
  286. )
  287. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
  288. handled_trays: set[tuple[int, int]] = set()
  289. with (
  290. patch("backend.app.core.config.settings") as mock_settings,
  291. patch(
  292. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  293. return_value=filament_usage,
  294. ),
  295. ):
  296. mock_settings.base_dir = MagicMock()
  297. mock_path = MagicMock()
  298. mock_path.exists.return_value = True
  299. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  300. results = await _track_from_3mf(
  301. printer_id=1,
  302. archive_id=80,
  303. status="completed",
  304. print_name="MidPrintReassign",
  305. handled_trays=handled_trays,
  306. printer_manager=printer_manager,
  307. db=db,
  308. spool_assignments={(0, 0): spool_old.id},
  309. print_started_at=started_at,
  310. )
  311. assert len(results) == 1
  312. assert results[0]["spool_id"] == spool_new.id
  313. @pytest.mark.asyncio
  314. async def test_keeps_snapshot_when_live_assignment_predates_print(self):
  315. """If live assignment predates print start, preserve snapshot spool mapping."""
  316. spool_old = _make_spool(spool_id=1, label_weight=1000)
  317. archive = _make_archive(archive_id=81)
  318. live_assignment = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  319. started_at = datetime.now(timezone.utc)
  320. live_assignment.created_at = started_at - timedelta(seconds=5)
  321. # db: archive, queue_item(None), live assignment lookup, spool_old lookup
  322. db = _mock_db_sequential([archive, None, live_assignment, spool_old])
  323. printer_manager = MagicMock()
  324. printer_manager.get_status.return_value = SimpleNamespace(
  325. progress=100,
  326. layer_num=50,
  327. tray_now=0,
  328. )
  329. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
  330. handled_trays: set[tuple[int, int]] = set()
  331. with (
  332. patch("backend.app.core.config.settings") as mock_settings,
  333. patch(
  334. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  335. return_value=filament_usage,
  336. ),
  337. ):
  338. mock_settings.base_dir = MagicMock()
  339. mock_path = MagicMock()
  340. mock_path.exists.return_value = True
  341. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  342. results = await _track_from_3mf(
  343. printer_id=1,
  344. archive_id=81,
  345. status="completed",
  346. print_name="SnapshotPreserved",
  347. handled_trays=handled_trays,
  348. printer_manager=printer_manager,
  349. db=db,
  350. spool_assignments={(0, 0): spool_old.id},
  351. print_started_at=started_at,
  352. )
  353. assert len(results) == 1
  354. assert results[0]["spool_id"] == spool_old.id
  355. @pytest.mark.asyncio
  356. async def test_linear_fallback_for_partial_print(self):
  357. """Falls back to linear scaling when gcode layer data unavailable."""
  358. spool = _make_spool(spool_id=1, label_weight=1000)
  359. assignment = _make_assignment(spool_id=1)
  360. archive = _make_archive(archive_id=10)
  361. # db: archive, queue_item(None), assignment, spool
  362. db = _mock_db_sequential([archive, None, assignment, spool])
  363. printer_manager = MagicMock()
  364. printer_manager.get_status.return_value = SimpleNamespace(
  365. progress=50,
  366. layer_num=25,
  367. tray_now=0,
  368. )
  369. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
  370. handled_trays: set[tuple[int, int]] = set()
  371. with (
  372. patch("backend.app.core.config.settings") as mock_settings,
  373. patch(
  374. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  375. return_value=filament_usage,
  376. ),
  377. patch(
  378. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  379. return_value=None, # No layer data available
  380. ),
  381. ):
  382. mock_settings.base_dir = MagicMock()
  383. mock_path = MagicMock()
  384. mock_path.exists.return_value = True
  385. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  386. results = await _track_from_3mf(
  387. printer_id=1,
  388. archive_id=10,
  389. status="failed",
  390. print_name="Benchy",
  391. handled_trays=handled_trays,
  392. printer_manager=printer_manager,
  393. db=db,
  394. )
  395. assert len(results) == 1
  396. # 50% of 20g = 10g
  397. assert results[0]["weight_used"] == 10.0
  398. # Tray should be marked as handled
  399. assert (0, 0) in handled_trays
  400. @pytest.mark.asyncio
  401. async def test_per_layer_partial_print(self):
  402. """Failed print at layer N uses gcode cumulative data."""
  403. spool = _make_spool(spool_id=1, label_weight=1000)
  404. assignment = _make_assignment(spool_id=1)
  405. archive = _make_archive(archive_id=10)
  406. # db: archive, queue_item(None), assignment, spool
  407. db = _mock_db_sequential([archive, None, assignment, spool])
  408. printer_manager = MagicMock()
  409. printer_manager.get_status.return_value = SimpleNamespace(
  410. progress=50,
  411. layer_num=25,
  412. tray_now=0,
  413. )
  414. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
  415. # Per-layer data: at layer 25, filament 0 used 5000mm
  416. layer_data = {10: {0: 2000.0}, 25: {0: 5000.0}, 50: {0: 10000.0}}
  417. filament_props = {1: {"density": 1.24, "diameter": 1.75}}
  418. handled_trays: set[tuple[int, int]] = set()
  419. with (
  420. patch("backend.app.core.config.settings") as mock_settings,
  421. patch(
  422. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  423. return_value=filament_usage,
  424. ),
  425. patch(
  426. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  427. return_value=layer_data,
  428. ),
  429. patch(
  430. "backend.app.utils.threemf_tools.get_cumulative_usage_at_layer",
  431. return_value={0: 5000.0},
  432. ),
  433. patch(
  434. "backend.app.utils.threemf_tools.extract_filament_properties_from_3mf",
  435. return_value=filament_props,
  436. ),
  437. patch(
  438. "backend.app.utils.threemf_tools.mm_to_grams",
  439. return_value=12.0, # 5000mm at 1.75mm/1.24g/cm3
  440. ),
  441. ):
  442. mock_settings.base_dir = MagicMock()
  443. mock_path = MagicMock()
  444. mock_path.exists.return_value = True
  445. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  446. results = await _track_from_3mf(
  447. printer_id=1,
  448. archive_id=10,
  449. status="failed",
  450. print_name="Benchy",
  451. handled_trays=handled_trays,
  452. printer_manager=printer_manager,
  453. db=db,
  454. )
  455. assert len(results) == 1
  456. # Should use per-layer grams (12.0g), not linear scale (10.0g)
  457. assert results[0]["weight_used"] == 12.0
  458. @pytest.mark.asyncio
  459. async def test_completed_print_uses_full_weight(self):
  460. """Completed print uses full 3MF weight (scale=1.0)."""
  461. spool = _make_spool(spool_id=1, label_weight=1000)
  462. assignment = _make_assignment(spool_id=1)
  463. archive = _make_archive(archive_id=10)
  464. # db: archive, queue_item(None), assignment, spool
  465. db = _mock_db_sequential([archive, None, assignment, spool])
  466. printer_manager = MagicMock()
  467. printer_manager.get_status.return_value = SimpleNamespace(
  468. progress=100,
  469. layer_num=50,
  470. tray_now=0,
  471. )
  472. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
  473. handled_trays: set[tuple[int, int]] = set()
  474. with (
  475. patch("backend.app.core.config.settings") as mock_settings,
  476. patch(
  477. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  478. return_value=filament_usage,
  479. ),
  480. ):
  481. mock_settings.base_dir = MagicMock()
  482. mock_path = MagicMock()
  483. mock_path.exists.return_value = True
  484. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  485. results = await _track_from_3mf(
  486. printer_id=1,
  487. archive_id=10,
  488. status="completed",
  489. print_name="Benchy",
  490. handled_trays=handled_trays,
  491. printer_manager=printer_manager,
  492. db=db,
  493. )
  494. assert len(results) == 1
  495. assert results[0]["weight_used"] == 20.0
  496. @pytest.mark.asyncio
  497. async def test_tray_now_override_for_single_filament(self):
  498. """Single-filament non-queue print uses tray_now instead of slot_id mapping."""
  499. # Spool 2 is at AMS1-T3 (global_tray_id=7)
  500. spool = _make_spool(spool_id=2, label_weight=1000)
  501. assignment = _make_assignment(spool_id=2, ams_id=1, tray_id=3)
  502. archive = _make_archive(archive_id=10)
  503. # db: archive, queue_item(None), assignment, spool
  504. db = _mock_db_sequential([archive, None, assignment, spool])
  505. # tray_now=7 = (ams_id=1, tray_id=3), the ACTUAL tray used
  506. printer_manager = MagicMock()
  507. printer_manager.get_status.return_value = SimpleNamespace(
  508. progress=100,
  509. layer_num=50,
  510. tray_now=7,
  511. )
  512. # 3MF has slot_id=12 (would default-map to ams_id=2, tray_id=3 — WRONG)
  513. filament_usage = [{"slot_id": 12, "used_g": 10.6, "type": "PLA", "color": "#FF0000"}]
  514. handled_trays: set[tuple[int, int]] = set()
  515. with (
  516. patch("backend.app.core.config.settings") as mock_settings,
  517. patch(
  518. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  519. return_value=filament_usage,
  520. ),
  521. ):
  522. mock_settings.base_dir = MagicMock()
  523. mock_path = MagicMock()
  524. mock_path.exists.return_value = True
  525. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  526. results = await _track_from_3mf(
  527. printer_id=1,
  528. archive_id=10,
  529. status="completed",
  530. print_name="Test",
  531. handled_trays=handled_trays,
  532. printer_manager=printer_manager,
  533. db=db,
  534. )
  535. assert len(results) == 1
  536. assert results[0]["spool_id"] == 2
  537. assert results[0]["ams_id"] == 1
  538. assert results[0]["tray_id"] == 3
  539. assert results[0]["weight_used"] == 10.6
  540. assert (1, 3) in handled_trays
  541. @pytest.mark.asyncio
  542. async def test_queue_ams_mapping_overrides_default(self):
  543. """Queue item ams_mapping overrides default slot_id mapping."""
  544. # Spool at AMS1-T3 (global_tray_id=7)
  545. spool = _make_spool(spool_id=5, label_weight=1000)
  546. assignment = _make_assignment(spool_id=5, ams_id=1, tray_id=3)
  547. archive = _make_archive(archive_id=20)
  548. # Queue item maps slot 1 → global tray 7 (ams_id=1, tray_id=3)
  549. queue_item = _make_queue_item(ams_mapping="[7, -1, -1, -1]")
  550. # db: archive, queue_item, assignment, spool
  551. db = _mock_db_sequential([archive, queue_item, assignment, spool])
  552. printer_manager = MagicMock()
  553. printer_manager.get_status.return_value = SimpleNamespace(
  554. progress=100,
  555. layer_num=50,
  556. tray_now=7,
  557. )
  558. filament_usage = [{"slot_id": 1, "used_g": 25.0, "type": "PETG", "color": ""}]
  559. handled_trays: set[tuple[int, int]] = set()
  560. with (
  561. patch("backend.app.core.config.settings") as mock_settings,
  562. patch(
  563. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  564. return_value=filament_usage,
  565. ),
  566. ):
  567. mock_settings.base_dir = MagicMock()
  568. mock_path = MagicMock()
  569. mock_path.exists.return_value = True
  570. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  571. results = await _track_from_3mf(
  572. printer_id=1,
  573. archive_id=20,
  574. status="completed",
  575. print_name="Queue Print",
  576. handled_trays=handled_trays,
  577. printer_manager=printer_manager,
  578. db=db,
  579. )
  580. assert len(results) == 1
  581. assert results[0]["spool_id"] == 5
  582. assert results[0]["ams_id"] == 1
  583. assert results[0]["tray_id"] == 3
  584. assert results[0]["weight_used"] == 25.0
  585. @pytest.mark.asyncio
  586. async def test_multi_filament_uses_queue_mapping(self):
  587. """Multi-filament queue prints use ams_mapping for each slot."""
  588. spool_a = _make_spool(spool_id=1, label_weight=1000)
  589. spool_b = _make_spool(spool_id=2, label_weight=1000)
  590. assign_a = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  591. assign_b = _make_assignment(spool_id=2, ams_id=1, tray_id=2)
  592. archive = _make_archive(archive_id=30)
  593. # slot 1 → tray 0 (AMS0-T0), slot 2 → tray 6 (AMS1-T2)
  594. queue_item = _make_queue_item(ams_mapping="[0, 6]")
  595. # db: archive, queue_item, assign_a, spool_a, assign_b, spool_b
  596. db = _mock_db_sequential([archive, queue_item, assign_a, spool_a, assign_b, spool_b])
  597. printer_manager = MagicMock()
  598. printer_manager.get_status.return_value = SimpleNamespace(
  599. progress=100,
  600. layer_num=50,
  601. tray_now=6,
  602. )
  603. filament_usage = [
  604. {"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""},
  605. {"slot_id": 2, "used_g": 5.0, "type": "PETG", "color": ""},
  606. ]
  607. handled_trays: set[tuple[int, int]] = set()
  608. with (
  609. patch("backend.app.core.config.settings") as mock_settings,
  610. patch(
  611. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  612. return_value=filament_usage,
  613. ),
  614. ):
  615. mock_settings.base_dir = MagicMock()
  616. mock_path = MagicMock()
  617. mock_path.exists.return_value = True
  618. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  619. results = await _track_from_3mf(
  620. printer_id=1,
  621. archive_id=30,
  622. status="completed",
  623. print_name="Multi",
  624. handled_trays=handled_trays,
  625. printer_manager=printer_manager,
  626. db=db,
  627. )
  628. assert len(results) == 2
  629. assert results[0]["spool_id"] == 1
  630. assert results[0]["ams_id"] == 0
  631. assert results[0]["tray_id"] == 0
  632. assert results[0]["weight_used"] == 10.0
  633. assert results[1]["spool_id"] == 2
  634. assert results[1]["ams_id"] == 1
  635. assert results[1]["tray_id"] == 2
  636. assert results[1]["weight_used"] == 5.0
  637. @pytest.mark.asyncio
  638. async def test_no_tray_now_override_for_multi_filament(self):
  639. """Multi-filament non-queue prints fall back to default mapping, not tray_now."""
  640. spool = _make_spool(spool_id=1, label_weight=1000)
  641. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  642. archive = _make_archive(archive_id=10)
  643. # db: archive, queue_item(None), assignment, spool (2nd slot has no assignment)
  644. db = _mock_db_sequential([archive, None, assignment, spool, None])
  645. printer_manager = MagicMock()
  646. printer_manager.get_status.return_value = SimpleNamespace(
  647. progress=100,
  648. layer_num=50,
  649. tray_now=4, # tray_now won't be used
  650. )
  651. # Two filament slots with usage
  652. filament_usage = [
  653. {"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""},
  654. {"slot_id": 2, "used_g": 5.0, "type": "PETG", "color": ""},
  655. ]
  656. handled_trays: set[tuple[int, int]] = set()
  657. with (
  658. patch("backend.app.core.config.settings") as mock_settings,
  659. patch(
  660. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  661. return_value=filament_usage,
  662. ),
  663. ):
  664. mock_settings.base_dir = MagicMock()
  665. mock_path = MagicMock()
  666. mock_path.exists.return_value = True
  667. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  668. results = await _track_from_3mf(
  669. printer_id=1,
  670. archive_id=10,
  671. status="completed",
  672. print_name="Test",
  673. handled_trays=handled_trays,
  674. printer_manager=printer_manager,
  675. db=db,
  676. )
  677. # Should use default mapping (slot 1 → tray 0, slot 2 → tray 1)
  678. assert len(results) == 1 # Only slot 1 has assignment
  679. assert results[0]["ams_id"] == 0
  680. assert results[0]["tray_id"] == 0
  681. @pytest.mark.asyncio
  682. async def test_stored_ams_mapping_overrides_all(self):
  683. """Stored ams_mapping from print command takes priority over queue and tray_now."""
  684. # Spool at AMS2-T1 (global_tray_id=9)
  685. spool = _make_spool(spool_id=10, label_weight=1000)
  686. assignment = _make_assignment(spool_id=10, ams_id=2, tray_id=1)
  687. archive = _make_archive(archive_id=50)
  688. # db: archive, assignment, spool (no queue lookup when ams_mapping provided)
  689. db = _mock_db_sequential([archive, assignment, spool])
  690. printer_manager = MagicMock()
  691. printer_manager.get_status.return_value = SimpleNamespace(
  692. progress=100,
  693. layer_num=50,
  694. tray_now=0, # Different from mapped tray — should be ignored
  695. last_loaded_tray=0,
  696. )
  697. filament_usage = [{"slot_id": 2, "used_g": 1.57, "type": "PLA", "color": "#FFFFFF"}]
  698. handled_trays: set[tuple[int, int]] = set()
  699. with (
  700. patch("backend.app.core.config.settings") as mock_settings,
  701. patch(
  702. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  703. return_value=filament_usage,
  704. ),
  705. ):
  706. mock_settings.base_dir = MagicMock()
  707. mock_path = MagicMock()
  708. mock_path.exists.return_value = True
  709. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  710. # ams_mapping: slot 2 (index 1) -> tray 9 (AMS2-T1)
  711. results = await _track_from_3mf(
  712. printer_id=1,
  713. archive_id=50,
  714. status="completed",
  715. print_name="Test",
  716. handled_trays=handled_trays,
  717. printer_manager=printer_manager,
  718. db=db,
  719. ams_mapping=[-1, 9],
  720. )
  721. assert len(results) == 1
  722. assert results[0]["spool_id"] == 10
  723. assert results[0]["ams_id"] == 2
  724. assert results[0]["tray_id"] == 1
  725. assert results[0]["weight_used"] == 1.6 # rounded
  726. @pytest.mark.asyncio
  727. async def test_last_loaded_tray_fallback(self):
  728. """Falls back to last_loaded_tray when tray_now_at_start and current tray_now are both 255."""
  729. # Spool at AMS2-T1 (global_tray_id=9)
  730. spool = _make_spool(spool_id=11, label_weight=1000)
  731. assignment = _make_assignment(spool_id=11, ams_id=2, tray_id=1)
  732. archive = _make_archive(archive_id=60)
  733. # db: archive, queue_item(None), assignment, spool
  734. db = _mock_db_sequential([archive, None, assignment, spool])
  735. # H2D scenario: tray_now=255 at completion, but last_loaded_tray=9
  736. printer_manager = MagicMock()
  737. printer_manager.get_status.return_value = SimpleNamespace(
  738. progress=100,
  739. layer_num=50,
  740. tray_now=255,
  741. last_loaded_tray=9,
  742. )
  743. filament_usage = [{"slot_id": 6, "used_g": 1.52, "type": "PLA", "color": "#7CC4D5"}]
  744. handled_trays: set[tuple[int, int]] = set()
  745. with (
  746. patch("backend.app.core.config.settings") as mock_settings,
  747. patch(
  748. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  749. return_value=filament_usage,
  750. ),
  751. ):
  752. mock_settings.base_dir = MagicMock()
  753. mock_path = MagicMock()
  754. mock_path.exists.return_value = True
  755. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  756. results = await _track_from_3mf(
  757. printer_id=1,
  758. archive_id=60,
  759. status="completed",
  760. print_name="Cube",
  761. handled_trays=handled_trays,
  762. printer_manager=printer_manager,
  763. db=db,
  764. tray_now_at_start=255, # H2D: 255 at start too
  765. )
  766. assert len(results) == 1
  767. assert results[0]["spool_id"] == 11
  768. assert results[0]["ams_id"] == 2
  769. assert results[0]["tray_id"] == 1
  770. @pytest.mark.asyncio
  771. async def test_tray_now_at_start_preferred_over_last_loaded(self):
  772. """tray_now_at_start is used before last_loaded_tray fallback."""
  773. spool = _make_spool(spool_id=3, label_weight=1000)
  774. assignment = _make_assignment(spool_id=3, ams_id=1, tray_id=1)
  775. archive = _make_archive(archive_id=70)
  776. db = _mock_db_sequential([archive, None, assignment, spool])
  777. # tray_now_at_start=5 (valid), last_loaded_tray=9 (different) — should use 5
  778. printer_manager = MagicMock()
  779. printer_manager.get_status.return_value = SimpleNamespace(
  780. progress=100,
  781. layer_num=50,
  782. tray_now=255,
  783. last_loaded_tray=9,
  784. )
  785. filament_usage = [{"slot_id": 1, "used_g": 5.0, "type": "PLA", "color": ""}]
  786. handled_trays: set[tuple[int, int]] = set()
  787. with (
  788. patch("backend.app.core.config.settings") as mock_settings,
  789. patch(
  790. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  791. return_value=filament_usage,
  792. ),
  793. ):
  794. mock_settings.base_dir = MagicMock()
  795. mock_path = MagicMock()
  796. mock_path.exists.return_value = True
  797. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  798. results = await _track_from_3mf(
  799. printer_id=1,
  800. archive_id=70,
  801. status="completed",
  802. print_name="Test",
  803. handled_trays=handled_trays,
  804. printer_manager=printer_manager,
  805. db=db,
  806. tray_now_at_start=5, # AMS1-T1
  807. )
  808. assert len(results) == 1
  809. assert results[0]["ams_id"] == 1
  810. assert results[0]["tray_id"] == 1
  811. class TestTrayChangeSplit:
  812. """Tests for mid-print tray switch weight splitting in _track_from_3mf()."""
  813. @pytest.mark.asyncio
  814. async def test_tray_switch_splits_weight_with_gcode(self):
  815. """Two-tray runout: weight split using per-layer gcode data."""
  816. spool_a = _make_spool(spool_id=10, label_weight=1000)
  817. spool_b = _make_spool(spool_id=20, label_weight=1000)
  818. assign_a = _make_assignment(spool_id=10, ams_id=0, tray_id=1)
  819. assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=0)
  820. archive = _make_archive(archive_id=100)
  821. # db: archive, queue_item(None), then for each segment: assignment, spool
  822. db = _mock_db_sequential([archive, None, assign_a, spool_a, assign_b, spool_b])
  823. # Tray change log: started on tray 1, switched to tray 0 at layer 60
  824. printer_manager = MagicMock()
  825. printer_manager.get_status.return_value = SimpleNamespace(
  826. progress=100,
  827. layer_num=100,
  828. tray_now=0,
  829. last_loaded_tray=0,
  830. total_layers=100,
  831. tray_change_log=[(1, 0), (0, 60)],
  832. )
  833. filament_usage = [{"slot_id": 1, "used_g": 30.0, "type": "PLA", "color": ""}]
  834. handled_trays: set[tuple[int, int]] = set()
  835. with (
  836. patch("backend.app.core.config.settings") as mock_settings,
  837. patch(
  838. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  839. return_value=filament_usage,
  840. ),
  841. patch(
  842. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  843. return_value={30: {0: 3000.0}, 60: {0: 6000.0}, 100: {0: 10000.0}},
  844. ),
  845. patch(
  846. "backend.app.utils.threemf_tools.get_cumulative_usage_at_layer",
  847. side_effect=lambda data, layer: {0: {0: 0.0, 60: 6000.0, 100: 10000.0}.get(layer, 0.0)},
  848. ),
  849. patch(
  850. "backend.app.utils.threemf_tools.extract_filament_properties_from_3mf",
  851. return_value={1: {"density": 1.24, "diameter": 1.75}},
  852. ),
  853. patch(
  854. "backend.app.utils.threemf_tools.mm_to_grams",
  855. side_effect=lambda mm, d, dens: round(mm * 0.003, 1), # Simple conversion
  856. ),
  857. ):
  858. mock_settings.base_dir = MagicMock()
  859. mock_path = MagicMock()
  860. mock_path.exists.return_value = True
  861. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  862. results = await _track_from_3mf(
  863. printer_id=1,
  864. archive_id=100,
  865. status="completed",
  866. print_name="Runout Test",
  867. handled_trays=handled_trays,
  868. printer_manager=printer_manager,
  869. db=db,
  870. )
  871. # Two results: one per tray segment
  872. assert len(results) == 2
  873. # First segment: tray 1 (AMS0-T1), layers 0→60
  874. assert results[0]["ams_id"] == 0
  875. assert results[0]["tray_id"] == 1
  876. assert results[0]["spool_id"] == 10
  877. assert results[0]["weight_used"] == 18.0 # 6000mm * 0.003
  878. # Second segment: tray 0 (AMS0-T0), layers 60→end = 30.0 - 18.0 = 12.0
  879. assert results[1]["ams_id"] == 0
  880. assert results[1]["tray_id"] == 0
  881. assert results[1]["spool_id"] == 20
  882. assert results[1]["weight_used"] == 12.0
  883. # Both trays handled
  884. assert (0, 1) in handled_trays
  885. assert (0, 0) in handled_trays
  886. @pytest.mark.asyncio
  887. async def test_tray_switch_linear_fallback(self):
  888. """Two-tray runout without per-layer gcode: linear split by layer ratio."""
  889. spool_a = _make_spool(spool_id=10, label_weight=1000)
  890. spool_b = _make_spool(spool_id=20, label_weight=1000)
  891. assign_a = _make_assignment(spool_id=10, ams_id=0, tray_id=2)
  892. assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=1)
  893. archive = _make_archive(archive_id=101)
  894. db = _mock_db_sequential([archive, None, assign_a, spool_a, assign_b, spool_b])
  895. # Tray 2 from layer 0, switched to tray 1 at layer 40 (of 100 total)
  896. printer_manager = MagicMock()
  897. printer_manager.get_status.return_value = SimpleNamespace(
  898. progress=100,
  899. layer_num=100,
  900. tray_now=1,
  901. last_loaded_tray=1,
  902. total_layers=100,
  903. tray_change_log=[(2, 0), (1, 40)],
  904. )
  905. filament_usage = [{"slot_id": 1, "used_g": 50.0, "type": "PLA", "color": ""}]
  906. handled_trays: set[tuple[int, int]] = set()
  907. with (
  908. patch("backend.app.core.config.settings") as mock_settings,
  909. patch(
  910. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  911. return_value=filament_usage,
  912. ),
  913. patch(
  914. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  915. return_value=None, # No per-layer gcode available
  916. ),
  917. ):
  918. mock_settings.base_dir = MagicMock()
  919. mock_path = MagicMock()
  920. mock_path.exists.return_value = True
  921. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  922. results = await _track_from_3mf(
  923. printer_id=1,
  924. archive_id=101,
  925. status="completed",
  926. print_name="Linear Fallback",
  927. handled_trays=handled_trays,
  928. printer_manager=printer_manager,
  929. db=db,
  930. )
  931. assert len(results) == 2
  932. # Linear split: tray 2 for 40/100 layers = 20g
  933. assert results[0]["ams_id"] == 0
  934. assert results[0]["tray_id"] == 2
  935. assert results[0]["weight_used"] == 20.0
  936. # Last segment gets remainder: 50 - 20 = 30g
  937. assert results[1]["ams_id"] == 0
  938. assert results[1]["tray_id"] == 1
  939. assert results[1]["weight_used"] == 30.0
  940. @pytest.mark.asyncio
  941. async def test_tray_switch_overrides_print_cmd_mapping(self):
  942. """tray_change_log evidence overrides slot_to_tray captured at print start.
  943. Regression for #957: when AMS auto-falls-back from an empty spool to a
  944. same-material sibling, the print_cmd's mapping (which named the
  945. original tray) is stale by the time the print finishes. Before this
  946. fix, the splitting branch was gated on ``not slot_to_tray`` so the
  947. slicer mapping was preferred even when the printer actually fed from
  948. a different tray — Path 1 credited the original tray with the full
  949. 3MF estimate and Path 2 layered the AMS-fallback delta on top, so
  950. spool consumption double-counted (e.g. 78 g print credited as 78 g
  951. + 60 g = 138 g). This test pins the new behavior: when
  952. tray_change_log has > 1 entries, splitting takes over regardless of
  953. whether ams_mapping was provided.
  954. """
  955. spool_a = _make_spool(spool_id=10, label_weight=1000)
  956. spool_b = _make_spool(spool_id=20, label_weight=1000)
  957. assign_a = _make_assignment(spool_id=10, ams_id=0, tray_id=0)
  958. assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=1)
  959. archive = _make_archive(archive_id=200)
  960. # No queue_item placeholder: passing ams_mapping bypasses the queue lookup
  961. # at usage_tracker.py:816 (`if not slot_to_tray and archive_id`).
  962. db = _mock_db_sequential([archive, assign_a, spool_a, assign_b, spool_b])
  963. # Slicer mapping says slot 1 -> tray 0; printer actually swapped to tray 1 at layer 30
  964. printer_manager = MagicMock()
  965. printer_manager.get_status.return_value = SimpleNamespace(
  966. progress=100,
  967. layer_num=100,
  968. tray_now=1,
  969. last_loaded_tray=1,
  970. total_layers=100,
  971. tray_change_log=[(0, 0), (1, 30)],
  972. )
  973. filament_usage = [{"slot_id": 1, "used_g": 78.0, "type": "PLA", "color": ""}]
  974. handled_trays: set[tuple[int, int]] = set()
  975. with (
  976. patch("backend.app.core.config.settings") as mock_settings,
  977. patch(
  978. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  979. return_value=filament_usage,
  980. ),
  981. patch(
  982. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  983. return_value=None, # No per-layer data — exercises linear fallback
  984. ),
  985. ):
  986. mock_settings.base_dir = MagicMock()
  987. mock_path = MagicMock()
  988. mock_path.exists.return_value = True
  989. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  990. results = await _track_from_3mf(
  991. printer_id=1,
  992. archive_id=200,
  993. status="completed",
  994. print_name="Runout w/ slicer mapping",
  995. handled_trays=handled_trays,
  996. printer_manager=printer_manager,
  997. db=db,
  998. # ams_mapping captured at print start — slicer told us slot 0 -> tray 0
  999. # (1-based slot_id=1 -> 0-based slot index 0).
  1000. ams_mapping=[0],
  1001. )
  1002. # Splitting branch ran despite ams_mapping being set: two segments,
  1003. # one per tray, total weight matches the 3MF estimate (no double-count).
  1004. assert len(results) == 2
  1005. total = sum(r["weight_used"] for r in results)
  1006. assert total == pytest.approx(78.0, abs=0.1)
  1007. # Both trays now in handled_trays so Path 2 (remain%-delta) skips them.
  1008. assert (0, 0) in handled_trays
  1009. assert (0, 1) in handled_trays
  1010. @pytest.mark.asyncio
  1011. async def test_no_tray_change_uses_normal_path(self):
  1012. """Single-entry tray_change_log falls through to normal tray_now_at_start logic."""
  1013. spool = _make_spool(spool_id=1, label_weight=1000)
  1014. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=2)
  1015. archive = _make_archive(archive_id=102)
  1016. db = _mock_db_sequential([archive, None, assignment, spool])
  1017. # Only one entry = no switch, should use normal path
  1018. printer_manager = MagicMock()
  1019. printer_manager.get_status.return_value = SimpleNamespace(
  1020. progress=100,
  1021. layer_num=100,
  1022. tray_now=2,
  1023. last_loaded_tray=2,
  1024. total_layers=100,
  1025. tray_change_log=[(2, 0)],
  1026. )
  1027. filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": ""}]
  1028. handled_trays: set[tuple[int, int]] = set()
  1029. with (
  1030. patch("backend.app.core.config.settings") as mock_settings,
  1031. patch(
  1032. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1033. return_value=filament_usage,
  1034. ),
  1035. ):
  1036. mock_settings.base_dir = MagicMock()
  1037. mock_path = MagicMock()
  1038. mock_path.exists.return_value = True
  1039. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1040. results = await _track_from_3mf(
  1041. printer_id=1,
  1042. archive_id=102,
  1043. status="completed",
  1044. print_name="No Switch",
  1045. handled_trays=handled_trays,
  1046. printer_manager=printer_manager,
  1047. db=db,
  1048. tray_now_at_start=2,
  1049. )
  1050. # Normal path: single result, full weight
  1051. assert len(results) == 1
  1052. assert results[0]["weight_used"] == 15.0
  1053. assert results[0]["ams_id"] == 0
  1054. assert results[0]["tray_id"] == 2
  1055. @pytest.mark.asyncio
  1056. async def test_empty_tray_change_log_uses_normal_path(self):
  1057. """Empty tray_change_log (e.g. server restart) falls through to existing logic."""
  1058. spool = _make_spool(spool_id=1, label_weight=1000)
  1059. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  1060. archive = _make_archive(archive_id=103)
  1061. db = _mock_db_sequential([archive, None, assignment, spool])
  1062. # Empty log (server restarted mid-print)
  1063. printer_manager = MagicMock()
  1064. printer_manager.get_status.return_value = SimpleNamespace(
  1065. progress=100,
  1066. layer_num=100,
  1067. tray_now=0,
  1068. last_loaded_tray=0,
  1069. total_layers=100,
  1070. tray_change_log=[],
  1071. )
  1072. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
  1073. handled_trays: set[tuple[int, int]] = set()
  1074. with (
  1075. patch("backend.app.core.config.settings") as mock_settings,
  1076. patch(
  1077. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1078. return_value=filament_usage,
  1079. ),
  1080. ):
  1081. mock_settings.base_dir = MagicMock()
  1082. mock_path = MagicMock()
  1083. mock_path.exists.return_value = True
  1084. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1085. results = await _track_from_3mf(
  1086. printer_id=1,
  1087. archive_id=103,
  1088. status="completed",
  1089. print_name="Restart Recovery",
  1090. handled_trays=handled_trays,
  1091. printer_manager=printer_manager,
  1092. db=db,
  1093. tray_now_at_start=0,
  1094. )
  1095. assert len(results) == 1
  1096. assert results[0]["weight_used"] == 10.0
  1097. @pytest.mark.asyncio
  1098. async def test_tray_switch_segment_no_spool(self):
  1099. """Segment with no spool assignment is skipped; other segments still tracked."""
  1100. spool_b = _make_spool(spool_id=20, label_weight=1000)
  1101. assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=3)
  1102. archive = _make_archive(archive_id=104)
  1103. # db: archive, queue_item(None), 1st segment: no assignment, 2nd segment: assignment, spool
  1104. db = _mock_db_sequential([archive, None, None, assign_b, spool_b])
  1105. # Tray 5 (no spool) from layer 0, switched to tray 3 at layer 50
  1106. printer_manager = MagicMock()
  1107. printer_manager.get_status.return_value = SimpleNamespace(
  1108. progress=100,
  1109. layer_num=100,
  1110. tray_now=3,
  1111. last_loaded_tray=3,
  1112. total_layers=100,
  1113. tray_change_log=[(5, 0), (3, 50)],
  1114. )
  1115. filament_usage = [{"slot_id": 1, "used_g": 40.0, "type": "PLA", "color": ""}]
  1116. handled_trays: set[tuple[int, int]] = set()
  1117. with (
  1118. patch("backend.app.core.config.settings") as mock_settings,
  1119. patch(
  1120. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1121. return_value=filament_usage,
  1122. ),
  1123. patch(
  1124. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  1125. return_value=None, # No per-layer data
  1126. ),
  1127. ):
  1128. mock_settings.base_dir = MagicMock()
  1129. mock_path = MagicMock()
  1130. mock_path.exists.return_value = True
  1131. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1132. results = await _track_from_3mf(
  1133. printer_id=1,
  1134. archive_id=104,
  1135. status="completed",
  1136. print_name="Missing Spool",
  1137. handled_trays=handled_trays,
  1138. printer_manager=printer_manager,
  1139. db=db,
  1140. )
  1141. # Only the second segment (tray 3) tracked; first segment (tray 5) skipped
  1142. assert len(results) == 1
  1143. assert results[0]["ams_id"] == 0
  1144. assert results[0]["tray_id"] == 3
  1145. assert results[0]["spool_id"] == 20
  1146. @pytest.mark.asyncio
  1147. async def test_tray_switch_three_segments(self):
  1148. """Three-segment switch (rare): A→B→C split by linear fallback."""
  1149. spool_a = _make_spool(spool_id=1, label_weight=1000)
  1150. spool_b = _make_spool(spool_id=2, label_weight=1000)
  1151. spool_c = _make_spool(spool_id=3, label_weight=1000)
  1152. assign_a = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  1153. assign_b = _make_assignment(spool_id=2, ams_id=0, tray_id=1)
  1154. assign_c = _make_assignment(spool_id=3, ams_id=0, tray_id=2)
  1155. archive = _make_archive(archive_id=105)
  1156. db = _mock_db_sequential(
  1157. [
  1158. archive,
  1159. None,
  1160. assign_a,
  1161. spool_a,
  1162. assign_b,
  1163. spool_b,
  1164. assign_c,
  1165. spool_c,
  1166. ]
  1167. )
  1168. # 3 segments: tray 0 (0-30), tray 1 (30-70), tray 2 (70-end)
  1169. printer_manager = MagicMock()
  1170. printer_manager.get_status.return_value = SimpleNamespace(
  1171. progress=100,
  1172. layer_num=100,
  1173. tray_now=2,
  1174. last_loaded_tray=2,
  1175. total_layers=100,
  1176. tray_change_log=[(0, 0), (1, 30), (2, 70)],
  1177. )
  1178. filament_usage = [{"slot_id": 1, "used_g": 100.0, "type": "PLA", "color": ""}]
  1179. handled_trays: set[tuple[int, int]] = set()
  1180. with (
  1181. patch("backend.app.core.config.settings") as mock_settings,
  1182. patch(
  1183. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1184. return_value=filament_usage,
  1185. ),
  1186. patch(
  1187. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  1188. return_value=None,
  1189. ),
  1190. ):
  1191. mock_settings.base_dir = MagicMock()
  1192. mock_path = MagicMock()
  1193. mock_path.exists.return_value = True
  1194. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1195. results = await _track_from_3mf(
  1196. printer_id=1,
  1197. archive_id=105,
  1198. status="completed",
  1199. print_name="Triple Switch",
  1200. handled_trays=handled_trays,
  1201. printer_manager=printer_manager,
  1202. db=db,
  1203. )
  1204. assert len(results) == 3
  1205. # Tray 0: 30/100 * 100g = 30g
  1206. assert results[0]["weight_used"] == 30.0
  1207. assert results[0]["ams_id"] == 0
  1208. assert results[0]["tray_id"] == 0
  1209. # Tray 1: 40/100 * 100g = 40g
  1210. assert results[1]["weight_used"] == 40.0
  1211. assert results[1]["ams_id"] == 0
  1212. assert results[1]["tray_id"] == 1
  1213. # Tray 2: remainder = 100 - 30 - 40 = 30g
  1214. assert results[2]["weight_used"] == 30.0
  1215. assert results[2]["ams_id"] == 0
  1216. assert results[2]["tray_id"] == 2
  1217. class TestDecodeMqttMapping:
  1218. """Tests for _decode_mqtt_mapping() — snow-encoded MQTT mapping to global tray IDs."""
  1219. def test_none_input(self):
  1220. assert _decode_mqtt_mapping(None) is None
  1221. def test_empty_list(self):
  1222. assert _decode_mqtt_mapping([]) is None
  1223. def test_all_unmapped(self):
  1224. """All 65535 values → None (no valid mappings)."""
  1225. assert _decode_mqtt_mapping([65535, 65535, 65535]) is None
  1226. def test_single_ams_slots(self):
  1227. """AMS 0 slots: snow values 0-3 → global tray IDs 0-3."""
  1228. assert _decode_mqtt_mapping([0, 1, 2, 3]) == [0, 1, 2, 3]
  1229. def test_multi_ams_slots(self):
  1230. """AMS 1 (hw_id=1): snow 256=AMS1-T0, 257=AMS1-T1 → global 4, 5."""
  1231. assert _decode_mqtt_mapping([256, 257]) == [4, 5]
  1232. def test_ams_ht_slot(self):
  1233. """AMS-HT (hw_id=128): snow 32768 → global 128."""
  1234. assert _decode_mqtt_mapping([32768]) == [128]
  1235. def test_external_spool(self):
  1236. """External spool: ams_hw_id=254, slot=0 → global 254."""
  1237. # snow = 254 * 256 + 0 = 65024
  1238. assert _decode_mqtt_mapping([65024]) == [254]
  1239. def test_mixed_with_unmapped(self):
  1240. """Mix of valid and unmapped (65535) values."""
  1241. result = _decode_mqtt_mapping([1, 65535, 0])
  1242. assert result == [1, -1, 0]
  1243. def test_h2c_real_mapping(self):
  1244. """Real H2C mapping from MQTT logs: [1, 0, 65535*4, 32768]."""
  1245. mapping = [1, 0, 65535, 65535, 65535, 65535, 32768]
  1246. result = _decode_mqtt_mapping(mapping)
  1247. assert result == [1, 0, -1, -1, -1, -1, 128]
  1248. def test_non_int_values_treated_as_unmapped(self):
  1249. """Non-integer values in the mapping are treated as unmapped."""
  1250. assert _decode_mqtt_mapping(["foo", 0]) == [-1, 0]
  1251. class TestMatchSlotsByColor:
  1252. """Tests for _match_slots_by_color() — color-based filament slot to AMS tray matching."""
  1253. def _ams(self, trays):
  1254. """Build AMS data from list of (ams_id, tray_id, color_hex, tray_type) tuples."""
  1255. units: dict[int, list] = {}
  1256. for ams_id, tray_id, color, tray_type in trays:
  1257. units.setdefault(ams_id, []).append({"id": tray_id, "tray_color": color, "tray_type": tray_type})
  1258. return [{"id": aid, "tray": t} for aid, t in units.items()]
  1259. def _usage(self, slots):
  1260. """Build filament_usage from list of (slot_id, color_hex) tuples."""
  1261. return [{"slot_id": sid, "used_g": 10.0, "type": "PLA", "color": color} for sid, color in slots]
  1262. def test_none_inputs(self):
  1263. assert _match_slots_by_color(None, None) is None
  1264. assert _match_slots_by_color([], None) is None
  1265. assert _match_slots_by_color(None, {"ams": []}) is None
  1266. def test_empty_ams(self):
  1267. usage = self._usage([(1, "#FF0000")])
  1268. assert _match_slots_by_color(usage, {"ams": []}) is None
  1269. def test_single_slot_single_tray(self):
  1270. """One 3MF slot matches one AMS tray by color."""
  1271. ams = self._ams([(0, 0, "FF0000FF", "PLA")])
  1272. usage = self._usage([(1, "#FF0000")])
  1273. assert _match_slots_by_color(usage, {"ams": ams}) == [0]
  1274. def test_a1_mini_three_colors(self):
  1275. """A1 Mini: 3 slots match 3 distinct AMS trays."""
  1276. ams = self._ams(
  1277. [
  1278. (0, 0, "FF0000FF", "PLA"), # Red
  1279. (0, 1, "00FF00FF", "PLA"), # Green
  1280. (0, 2, "0000FFFF", "PLA"), # Blue
  1281. ]
  1282. )
  1283. usage = self._usage([(1, "#FF0000"), (2, "#00FF00"), (3, "#0000FF")])
  1284. assert _match_slots_by_color(usage, {"ams": ams}) == [0, 1, 2]
  1285. def test_dual_ams_p2s_like(self):
  1286. """P2S with dual AMS: slots from second AMS unit."""
  1287. ams = self._ams(
  1288. [
  1289. (0, 0, "AAAAAAFF", "PLA"),
  1290. (0, 1, "BBBBBBFF", "PLA"),
  1291. (1, 0, "CC0000FF", "PETG"), # global_id=4
  1292. (1, 1, "00CC00FF", "PETG"), # global_id=5
  1293. ]
  1294. )
  1295. usage = self._usage([(1, "#CC0000"), (2, "#00CC00")])
  1296. assert _match_slots_by_color(usage, {"ams": ams}) == [4, 5]
  1297. def test_ams_ht_global_id(self):
  1298. """AMS-HT (ams_id >= 128) uses raw ams_id as global tray ID."""
  1299. ams = self._ams(
  1300. [
  1301. (0, 0, "FF0000FF", "PLA"),
  1302. (128, 0, "0000FFFF", "PLA"), # AMS-HT → global_id=128
  1303. ]
  1304. )
  1305. usage = self._usage([(1, "#FF0000"), (2, "#0000FF")])
  1306. assert _match_slots_by_color(usage, {"ams": ams}) == [0, 128]
  1307. def test_ambiguous_same_color_returns_none(self):
  1308. """Two trays with the same color → ambiguous → None."""
  1309. ams = self._ams(
  1310. [
  1311. (0, 0, "FF0000FF", "PLA"),
  1312. (0, 1, "FF0000FF", "PLA"), # Same red
  1313. ]
  1314. )
  1315. usage = self._usage([(1, "#FF0000")])
  1316. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1317. def test_no_matching_color_returns_none(self):
  1318. """3MF slot color not found in any AMS tray → None."""
  1319. ams = self._ams([(0, 0, "00FF00FF", "PLA")])
  1320. usage = self._usage([(1, "#FF0000")]) # Red, but AMS has green
  1321. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1322. def test_color_normalization_strips_alpha(self):
  1323. """AMS colors (RRGGBBAA) and 3MF colors (#RRGGBB) match after normalization."""
  1324. ams = self._ams([(0, 0, "AABBCC80", "PLA")]) # 8-char with alpha
  1325. usage = self._usage([(1, "#AABBCC")]) # 6-char with #
  1326. assert _match_slots_by_color(usage, {"ams": ams}) == [0]
  1327. def test_case_insensitive(self):
  1328. """Color matching is case-insensitive."""
  1329. ams = self._ams([(0, 0, "aaBBccFF", "PLA")])
  1330. usage = self._usage([(1, "#AAbbCC")])
  1331. assert _match_slots_by_color(usage, {"ams": ams}) == [0]
  1332. def test_empty_tray_color_skipped(self):
  1333. """Trays with empty color are skipped (not matched)."""
  1334. ams = self._ams(
  1335. [
  1336. (0, 0, "", "PLA"),
  1337. (0, 1, "FF0000FF", "PLA"),
  1338. ]
  1339. )
  1340. usage = self._usage([(1, "#FF0000")])
  1341. assert _match_slots_by_color(usage, {"ams": ams}) == [1]
  1342. def test_empty_tray_type_skipped(self):
  1343. """Trays with empty tray_type are skipped (unloaded slot)."""
  1344. ams = self._ams(
  1345. [
  1346. (0, 0, "FF0000FF", ""), # Empty slot
  1347. (0, 1, "FF0000FF", "PLA"), # Loaded slot
  1348. ]
  1349. )
  1350. usage = self._usage([(1, "#FF0000")])
  1351. assert _match_slots_by_color(usage, {"ams": ams}) == [1]
  1352. def test_short_slot_color_returns_none(self):
  1353. """3MF slot with color < 6 chars → can't match → None."""
  1354. ams = self._ams([(0, 0, "FF0000FF", "PLA")])
  1355. usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FFF"}]
  1356. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1357. def test_slot_id_zero_skipped(self):
  1358. """Slots with slot_id=0 are skipped."""
  1359. ams = self._ams([(0, 0, "FF0000FF", "PLA")])
  1360. usage = [{"slot_id": 0, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  1361. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1362. def test_ams_data_as_list(self):
  1363. """Handles ams_raw as a plain list (some printer models)."""
  1364. ams_list = [{"id": 0, "tray": [{"id": 0, "tray_color": "FF0000FF", "tray_type": "PLA"}]}]
  1365. usage = self._usage([(1, "#FF0000")])
  1366. assert _match_slots_by_color(usage, ams_list) == [0]
  1367. def test_same_color_two_trays_disambiguated_by_usage(self):
  1368. """Two trays same color, two slots same color → unique assignment via used_trays tracking."""
  1369. ams = self._ams(
  1370. [
  1371. (0, 0, "FF0000FF", "PLA"),
  1372. (0, 1, "FF0000FF", "PLA"),
  1373. ]
  1374. )
  1375. # Two slots both wanting red — first gets tray 0, second gets tray 1? No.
  1376. # When first slot takes the only available, second has 1 left → should work
  1377. usage = self._usage([(1, "#FF0000"), (2, "#FF0000")])
  1378. # First slot: candidates=[0,1], available=[0,1], len!=1 → None
  1379. assert _match_slots_by_color(usage, {"ams": ams}) is None
  1380. def test_dict_wrapper_with_ams_key(self):
  1381. """Standard dict format with 'ams' key."""
  1382. ams_data = {"ams": [{"id": 0, "tray": [{"id": 0, "tray_color": "00FF00FF", "tray_type": "PLA"}]}]}
  1383. usage = self._usage([(1, "#00FF00")])
  1384. assert _match_slots_by_color(usage, ams_data) == [0]
  1385. class TestMqttMappingIntegration:
  1386. """Integration tests: MQTT mapping field used in _track_from_3mf."""
  1387. @pytest.mark.asyncio
  1388. async def test_h2c_multi_filament_uses_mqtt_mapping(self):
  1389. """H2C: 3 filaments resolved via MQTT mapping field (no ams_mapping, no queue)."""
  1390. # AMS0-T1 (White PLA), AMS0-T0 (Black PLA), AMS128-T0 (Red PLA)
  1391. spool_white = _make_spool(spool_id=1, label_weight=1000)
  1392. spool_black = _make_spool(spool_id=2, label_weight=1000)
  1393. spool_red = _make_spool(spool_id=3, label_weight=1000)
  1394. assign_white = _make_assignment(spool_id=1, ams_id=0, tray_id=1)
  1395. assign_black = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  1396. assign_red = _make_assignment(spool_id=3, ams_id=128, tray_id=0)
  1397. archive = _make_archive(archive_id=12)
  1398. # db: archive, then 3 pairs of (assignment, spool)
  1399. # No queue lookup because MQTT mapping is found first
  1400. db = _mock_db_sequential(
  1401. [
  1402. archive,
  1403. assign_white,
  1404. spool_white,
  1405. assign_black,
  1406. spool_black,
  1407. assign_red,
  1408. spool_red,
  1409. ]
  1410. )
  1411. # MQTT mapping: slot0→AMS0-T1(1), slot1→AMS0-T0(0), slots2-5→unmapped, slot6→AMS128-T0(32768)
  1412. printer_manager = MagicMock()
  1413. printer_manager.get_status.return_value = SimpleNamespace(
  1414. raw_data={"mapping": [1, 0, 65535, 65535, 65535, 65535, 32768]},
  1415. progress=100,
  1416. layer_num=50,
  1417. tray_now=255,
  1418. )
  1419. # 3MF slots 1, 2, 7 (1-based) → indices 0, 1, 6 in mapping
  1420. filament_usage = [
  1421. {"slot_id": 1, "used_g": 21.16, "type": "PLA", "color": "#FFFFFF"},
  1422. {"slot_id": 2, "used_g": 24.22, "type": "PLA", "color": "#000000"},
  1423. {"slot_id": 7, "used_g": 18.47, "type": "PLA", "color": "#F72323"},
  1424. ]
  1425. handled_trays: set[tuple[int, int]] = set()
  1426. with (
  1427. patch("backend.app.core.config.settings") as mock_settings,
  1428. patch(
  1429. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1430. return_value=filament_usage,
  1431. ),
  1432. ):
  1433. mock_settings.base_dir = MagicMock()
  1434. mock_path = MagicMock()
  1435. mock_path.exists.return_value = True
  1436. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1437. results = await _track_from_3mf(
  1438. printer_id=1,
  1439. archive_id=12,
  1440. status="completed",
  1441. print_name="Cube + Cube + Cube",
  1442. handled_trays=handled_trays,
  1443. printer_manager=printer_manager,
  1444. db=db,
  1445. )
  1446. assert len(results) == 3
  1447. # slot_id=1 → mapping[0]=1 → AMS0-T1 (White PLA)
  1448. assert results[0]["spool_id"] == 1
  1449. assert results[0]["ams_id"] == 0
  1450. assert results[0]["tray_id"] == 1
  1451. assert results[0]["weight_used"] == 21.2
  1452. # slot_id=2 → mapping[1]=0 → AMS0-T0 (Black PLA)
  1453. assert results[1]["spool_id"] == 2
  1454. assert results[1]["ams_id"] == 0
  1455. assert results[1]["tray_id"] == 0
  1456. assert results[1]["weight_used"] == 24.2
  1457. # slot_id=7 → mapping[6]=32768 → AMS128-T0 (Red PLA)
  1458. assert results[2]["spool_id"] == 3
  1459. assert results[2]["ams_id"] == 128
  1460. assert results[2]["tray_id"] == 0
  1461. assert results[2]["weight_used"] == 18.5
  1462. @pytest.mark.asyncio
  1463. async def test_print_cmd_mapping_takes_priority_over_mqtt(self):
  1464. """ams_mapping from print command is used even when MQTT mapping exists."""
  1465. spool = _make_spool(spool_id=1, label_weight=1000)
  1466. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=2)
  1467. archive = _make_archive(archive_id=10)
  1468. # db: archive, assignment, spool (no queue lookup when ams_mapping provided)
  1469. db = _mock_db_sequential([archive, assignment, spool])
  1470. printer_manager = MagicMock()
  1471. printer_manager.get_status.return_value = SimpleNamespace(
  1472. raw_data={"mapping": [0, 65535]}, # MQTT says slot 0 → AMS0-T0
  1473. progress=100,
  1474. layer_num=50,
  1475. tray_now=255,
  1476. )
  1477. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
  1478. handled_trays: set[tuple[int, int]] = set()
  1479. with (
  1480. patch("backend.app.core.config.settings") as mock_settings,
  1481. patch(
  1482. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1483. return_value=filament_usage,
  1484. ),
  1485. ):
  1486. mock_settings.base_dir = MagicMock()
  1487. mock_path = MagicMock()
  1488. mock_path.exists.return_value = True
  1489. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  1490. results = await _track_from_3mf(
  1491. printer_id=1,
  1492. archive_id=10,
  1493. status="completed",
  1494. print_name="Test",
  1495. handled_trays=handled_trays,
  1496. printer_manager=printer_manager,
  1497. db=db,
  1498. ams_mapping=[2], # Print cmd says slot 0 → AMS0-T2 (overrides MQTT)
  1499. )
  1500. assert len(results) == 1
  1501. assert results[0]["ams_id"] == 0
  1502. assert results[0]["tray_id"] == 2 # From print_cmd mapping, not MQTT
  1503. class TestNotificationVariables:
  1504. """Tests for filament_details formatting in notifications."""
  1505. def test_filament_details_single_slot(self):
  1506. """Single slot produces 'PLA: 15.2g' format."""
  1507. slots = [{"type": "PLA", "used_g": 15.2, "slot_id": 1, "color": "#FF0000"}]
  1508. parts = []
  1509. for slot in slots:
  1510. ftype = slot.get("type", "Unknown") or "Unknown"
  1511. used = slot.get("used_g", 0)
  1512. parts.append(f"{ftype}: {used:.1f}g")
  1513. result = " | ".join(parts)
  1514. assert result == "PLA: 15.2g"
  1515. def test_filament_details_multi_slot(self):
  1516. """Multiple slots produce 'PLA: 10.0g | PETG: 5.0g' format."""
  1517. slots = [
  1518. {"type": "PLA", "used_g": 10.0, "slot_id": 1, "color": ""},
  1519. {"type": "PETG", "used_g": 5.0, "slot_id": 2, "color": ""},
  1520. ]
  1521. parts = []
  1522. for slot in slots:
  1523. ftype = slot.get("type", "Unknown") or "Unknown"
  1524. used = slot.get("used_g", 0)
  1525. parts.append(f"{ftype}: {used:.1f}g")
  1526. result = " | ".join(parts)
  1527. assert result == "PLA: 10.0g | PETG: 5.0g"
  1528. def test_filament_details_empty_type(self):
  1529. """Empty type defaults to 'Unknown'."""
  1530. slots = [{"type": "", "used_g": 5.0, "slot_id": 1, "color": ""}]
  1531. parts = []
  1532. for slot in slots:
  1533. ftype = slot.get("type", "Unknown") or "Unknown"
  1534. used = slot.get("used_g", 0)
  1535. parts.append(f"{ftype}: {used:.1f}g")
  1536. result = " | ".join(parts)
  1537. assert result == "Unknown: 5.0g"
  1538. def test_filament_grams_scaled_for_partial(self):
  1539. """filament_grams is scaled by progress for partial prints."""
  1540. filament_used_grams = 20.0
  1541. progress = 50
  1542. scale = max(0.0, min(progress / 100.0, 1.0))
  1543. scaled = round(filament_used_grams * scale, 1)
  1544. assert scaled == 10.0
  1545. def test_filament_grams_zero_progress(self):
  1546. """Progress=0 at cancellation gives 0.0g."""
  1547. filament_used_grams = 20.0
  1548. progress = 0
  1549. scale = max(0.0, min(progress / 100.0, 1.0))
  1550. scaled = round(filament_used_grams * scale, 1)
  1551. assert scaled == 0.0
  1552. def test_slot_scaling_for_partial(self):
  1553. """Per-slot usage is scaled linearly for partial prints."""
  1554. slots = [
  1555. {"type": "PLA", "used_g": 20.0, "slot_id": 1, "color": ""},
  1556. {"type": "PETG", "used_g": 10.0, "slot_id": 2, "color": ""},
  1557. ]
  1558. progress = 30
  1559. scale = max(0.0, min(progress / 100.0, 1.0))
  1560. scaled_slots = [{**s, "used_g": round(s["used_g"] * scale, 1)} for s in slots]
  1561. assert scaled_slots[0]["used_g"] == 6.0
  1562. assert scaled_slots[1]["used_g"] == 3.0
  1563. class TestOnPrintStartAmsMapping:
  1564. """Tests for ams_mapping capture in on_print_start()."""
  1565. @pytest.fixture(autouse=True)
  1566. def _clear_sessions(self):
  1567. _active_sessions.clear()
  1568. yield
  1569. _active_sessions.clear()
  1570. @pytest.mark.asyncio
  1571. async def test_captures_ams_mapping_from_data(self):
  1572. """on_print_start captures ams_mapping from the data dict into the session."""
  1573. printer_manager = MagicMock()
  1574. printer_manager.get_status.return_value = SimpleNamespace(
  1575. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
  1576. tray_now=0,
  1577. )
  1578. await on_print_start(1, {"subtask_name": "Test", "ams_mapping": [3, -1, -1, 2]}, printer_manager)
  1579. assert _active_sessions[1].ams_mapping == [3, -1, -1, 2]
  1580. @pytest.mark.asyncio
  1581. async def test_ams_mapping_none_when_not_in_data(self):
  1582. """Session ams_mapping is None when data dict has no ams_mapping."""
  1583. printer_manager = MagicMock()
  1584. printer_manager.get_status.return_value = SimpleNamespace(
  1585. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
  1586. tray_now=0,
  1587. )
  1588. await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
  1589. assert _active_sessions[1].ams_mapping is None
  1590. class TestFindThreemfByFilename:
  1591. """Tests for _find_3mf_by_filename() — library/archive search without archive_id."""
  1592. @pytest.mark.asyncio
  1593. async def test_finds_library_file(self):
  1594. """Finds a 3MF from library files matching filename."""
  1595. from pathlib import Path
  1596. from unittest.mock import MagicMock
  1597. lib_file = MagicMock()
  1598. lib_file.file_path = "library/BMCU-BADGE.3mf"
  1599. mock_result = MagicMock()
  1600. mock_result.scalars.return_value.all.return_value = [lib_file]
  1601. db = AsyncMock()
  1602. db.execute = AsyncMock(return_value=mock_result)
  1603. base_dir = MagicMock(spec=Path)
  1604. candidate = MagicMock(spec=Path)
  1605. candidate.exists.return_value = True
  1606. candidate.suffix = ".3mf"
  1607. base_dir.__truediv__ = MagicMock(return_value=candidate)
  1608. result = await _find_3mf_by_filename(1, "BMCU-BADGE.3mf", db, base_dir)
  1609. assert result == candidate
  1610. @pytest.mark.asyncio
  1611. async def test_returns_none_for_empty_filename(self):
  1612. """Returns None when filename is empty or just extensions."""
  1613. db = AsyncMock()
  1614. base_dir = MagicMock()
  1615. result = await _find_3mf_by_filename(1, ".3mf", db, base_dir)
  1616. assert result is None
  1617. result = await _find_3mf_by_filename(1, "", db, base_dir)
  1618. assert result is None
  1619. @pytest.mark.asyncio
  1620. async def test_falls_through_to_archive_search(self):
  1621. """Falls back to previous archives when library search returns no results."""
  1622. from pathlib import Path
  1623. # Library returns nothing
  1624. empty_result = MagicMock()
  1625. empty_result.scalars.return_value.all.return_value = []
  1626. # Archive returns a match
  1627. archive = MagicMock()
  1628. archive.id = 35
  1629. archive.file_path = "archives/35/BMCU-BADGE.3mf"
  1630. archive_result = MagicMock()
  1631. archive_result.scalars.return_value.all.return_value = [archive]
  1632. db = AsyncMock()
  1633. db.execute = AsyncMock(side_effect=[empty_result, archive_result])
  1634. base_dir = MagicMock(spec=Path)
  1635. candidate = MagicMock(spec=Path)
  1636. candidate.exists.return_value = True
  1637. candidate.suffix = ".3mf"
  1638. base_dir.__truediv__ = MagicMock(return_value=candidate)
  1639. result = await _find_3mf_by_filename(1, "BMCU-BADGE.3mf", db, base_dir)
  1640. assert result == candidate
  1641. assert db.execute.call_count == 2
  1642. @pytest.mark.asyncio
  1643. async def test_returns_none_when_nothing_found(self):
  1644. """Returns None when neither library nor archives have a matching 3MF."""
  1645. empty_result = MagicMock()
  1646. empty_result.scalars.return_value.all.return_value = []
  1647. db = AsyncMock()
  1648. db.execute = AsyncMock(return_value=empty_result)
  1649. base_dir = MagicMock()
  1650. result = await _find_3mf_by_filename(1, "nonexistent.3mf", db, base_dir)
  1651. assert result is None
  1652. @pytest.mark.asyncio
  1653. async def test_strips_path_and_extensions(self):
  1654. """Correctly strips path components and extensions for search."""
  1655. empty_result = MagicMock()
  1656. empty_result.scalars.return_value.all.return_value = []
  1657. db = AsyncMock()
  1658. db.execute = AsyncMock(return_value=empty_result)
  1659. base_dir = MagicMock()
  1660. # Should search for "BMCU-BADGE" base name even with path and .gcode.3mf
  1661. await _find_3mf_by_filename(1, "/sdcard/BMCU-BADGE.gcode.3mf", db, base_dir)
  1662. # Verify the execute was called (search was attempted with stripped name)
  1663. assert db.execute.call_count == 2 # library + archive search
  1664. class TestTrackFrom3mfWithPreresolvedPath:
  1665. """Tests for _track_from_3mf() with threemf_path (no archive needed)."""
  1666. @pytest.mark.asyncio
  1667. async def test_uses_preresolved_path_without_archive(self):
  1668. """When threemf_path is provided with archive_id=None, uses the path directly."""
  1669. spool = _make_spool(spool_id=1, label_weight=1000)
  1670. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=3)
  1671. # DB: 1st call = assignment lookup (live), 2nd = spool lookup
  1672. db = _mock_db_sequential([assignment, spool])
  1673. printer_manager = MagicMock()
  1674. printer_manager.get_status.return_value = SimpleNamespace(
  1675. raw_data={"ams": [{"id": 0, "tray": []}]},
  1676. tray_now=255,
  1677. last_loaded_tray=3,
  1678. tray_change_log=[],
  1679. )
  1680. filament_usage = [{"slot_id": 1, "used_g": 5.0, "type": "PETG", "color": "#FFFFFF"}]
  1681. with (
  1682. patch(
  1683. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1684. return_value=filament_usage,
  1685. ),
  1686. patch("backend.app.core.config.settings") as mock_settings,
  1687. ):
  1688. mock_settings.base_dir = MagicMock()
  1689. mock_path = MagicMock()
  1690. mock_path.exists.return_value = True
  1691. results = await _track_from_3mf(
  1692. printer_id=1,
  1693. archive_id=None,
  1694. status="completed",
  1695. print_name="BMCU-BADGE",
  1696. handled_trays=set(),
  1697. printer_manager=printer_manager,
  1698. db=db,
  1699. ams_mapping=[3, -1, -1, -1],
  1700. threemf_path=mock_path,
  1701. )
  1702. assert len(results) == 1
  1703. assert results[0]["spool_id"] == 1
  1704. assert results[0]["weight_used"] == 5.0
  1705. @pytest.mark.asyncio
  1706. async def test_skips_queue_lookup_without_archive_id(self):
  1707. """When archive_id is None, queue item lookup is skipped."""
  1708. spool = _make_spool(spool_id=1, label_weight=1000)
  1709. assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  1710. db = _mock_db_sequential([assignment, spool])
  1711. printer_manager = MagicMock()
  1712. printer_manager.get_status.return_value = SimpleNamespace(
  1713. raw_data={"ams": [{"id": 0, "tray": []}]},
  1714. tray_now=0,
  1715. last_loaded_tray=0,
  1716. tray_change_log=[],
  1717. )
  1718. filament_usage = [{"slot_id": 1, "used_g": 2.0, "type": "PLA", "color": "#FF0000"}]
  1719. with (
  1720. patch(
  1721. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  1722. return_value=filament_usage,
  1723. ),
  1724. patch("backend.app.core.config.settings") as mock_settings,
  1725. ):
  1726. mock_settings.base_dir = MagicMock()
  1727. mock_path = MagicMock()
  1728. mock_path.exists.return_value = True
  1729. # Should NOT fail even though there's no archive_id for queue lookup
  1730. results = await _track_from_3mf(
  1731. printer_id=1,
  1732. archive_id=None,
  1733. status="completed",
  1734. print_name="Test",
  1735. handled_trays=set(),
  1736. printer_manager=printer_manager,
  1737. db=db,
  1738. tray_now_at_start=0,
  1739. threemf_path=mock_path,
  1740. )
  1741. assert len(results) == 1
  1742. assert results[0]["weight_used"] == 2.0