test_cost_tracking.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949
  1. """Unit tests for cost tracking in usage_tracker.py.
  2. Tests cost calculation scenarios:
  3. - Spool-specific cost_per_kg
  4. - Default fallback cost from settings
  5. - Spools without cost (None)
  6. - Completed prints
  7. - Failed/partial prints
  8. - Cost aggregation to archives
  9. """
  10. import os
  11. import tempfile
  12. from datetime import datetime, timezone
  13. from types import SimpleNamespace
  14. from unittest.mock import AsyncMock, MagicMock, patch
  15. import pytest
  16. from backend.app.services.usage_tracker import (
  17. PrintSession,
  18. _active_sessions,
  19. _track_from_3mf,
  20. on_print_complete,
  21. )
  22. def _make_spool(spool_id=1, label_weight=1000, weight_used=0, cost_per_kg=None):
  23. """Create a mock Spool object with cost fields."""
  24. spool = MagicMock()
  25. spool.id = spool_id
  26. spool.label_weight = label_weight
  27. spool.weight_used = weight_used
  28. spool.cost_per_kg = cost_per_kg
  29. spool.last_used = None
  30. spool.material = "PLA"
  31. return spool
  32. def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
  33. """Create a mock SpoolAssignment object."""
  34. assignment = MagicMock()
  35. assignment.spool_id = spool_id
  36. assignment.printer_id = printer_id
  37. assignment.ams_id = ams_id
  38. assignment.tray_id = tray_id
  39. return assignment
  40. def _make_archive(archive_id=1, file_path=None):
  41. """Create a mock PrintArchive object with a temp file, and register cleanup."""
  42. if file_path is None:
  43. with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf", prefix="test_print_") as tmp:
  44. file_path = tmp.name
  45. # Register cleanup for this file after the test
  46. import pytest
  47. frame = None
  48. try:
  49. raise Exception
  50. except Exception:
  51. import sys
  52. frame = sys._getframe(1)
  53. request = frame.f_locals.get("request")
  54. if request is not None:
  55. def cleanup():
  56. try:
  57. os.remove(file_path)
  58. except Exception:
  59. pass
  60. request.addfinalizer(cleanup)
  61. archive = MagicMock()
  62. archive.id = archive_id
  63. archive.file_path = file_path
  64. # Explicit numeric default so the #1344 top-up logic (archive_grams -
  65. # tracked_grams) doesn't compare a MagicMock to a float. Tests that
  66. # exercise the top-up path overwrite this with a real number.
  67. archive.filament_used_grams = 0
  68. return archive
  69. @pytest.fixture(autouse=True)
  70. def cleanup_temp_archives():
  71. yield
  72. # Cleanup any temp .3mf files created by _make_archive
  73. import glob
  74. for f in glob.glob("test_print_*.3mf"):
  75. try:
  76. os.remove(f)
  77. except Exception:
  78. pass
  79. @pytest.fixture(autouse=True)
  80. def cleanup_test_print_gcode():
  81. yield
  82. import os
  83. path = "archives/test/test_print.gcode.3mf"
  84. if os.path.exists(path):
  85. try:
  86. os.remove(path)
  87. except Exception:
  88. pass
  89. @pytest.fixture
  90. def archive_factory_temp():
  91. import tempfile
  92. def _factory(*args, **kwargs):
  93. with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf", prefix="test_print_", dir="archives/test") as tmp:
  94. kwargs["file_path"] = tmp.name
  95. return kwargs["file_path"]
  96. yield _factory
  97. # Cleanup
  98. import glob
  99. import os
  100. for f in glob.glob("archives/test/test_print_*.3mf"):
  101. try:
  102. os.remove(f)
  103. except Exception:
  104. pass
  105. def _mock_db_sequential(responses):
  106. """Create mock db that returns responses in order."""
  107. db = AsyncMock()
  108. call_count = [0]
  109. async def mock_execute(*args, **kwargs):
  110. idx = call_count[0]
  111. call_count[0] += 1
  112. result = MagicMock()
  113. if idx < len(responses):
  114. result.scalar_one_or_none.return_value = responses[idx]
  115. else:
  116. result.scalar_one_or_none.return_value = None
  117. return result
  118. db.execute = mock_execute
  119. return db
  120. class TestCostCalculation:
  121. """Tests for cost calculation in usage tracking."""
  122. @pytest.fixture(autouse=True)
  123. def _clear_sessions(self):
  124. _active_sessions.clear()
  125. yield
  126. _active_sessions.clear()
  127. @pytest.mark.asyncio
  128. async def test_cost_with_spool_specific_cost_per_kg(self):
  129. """Cost is calculated using spool-specific cost_per_kg when available."""
  130. # Spool with cost_per_kg = 25.00 USD/kg
  131. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  132. assignment = _make_assignment(spool_id=1)
  133. archive = _make_archive(archive_id=10)
  134. _active_sessions[1] = PrintSession(
  135. printer_id=1,
  136. print_name="Test",
  137. started_at=datetime.now(timezone.utc),
  138. tray_remain_start={(0, 0): 80},
  139. tray_now_at_start=0,
  140. )
  141. printer_manager = MagicMock()
  142. printer_manager.get_status.return_value = SimpleNamespace(
  143. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  144. progress=100,
  145. layer_num=50,
  146. tray_now=0,
  147. )
  148. # db returns: archive, queue_item(None), assignment, spool
  149. db = _mock_db_sequential([archive, None, assignment, spool])
  150. # 20g used from 3MF
  151. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  152. with (
  153. patch("backend.app.core.config.settings") as mock_settings,
  154. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"), # default cost
  155. patch(
  156. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  157. return_value=filament_usage,
  158. ),
  159. ):
  160. mock_settings.base_dir = MagicMock()
  161. mock_path = MagicMock()
  162. mock_path.exists.return_value = True
  163. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  164. results = await on_print_complete(
  165. printer_id=1,
  166. data={"status": "completed"},
  167. printer_manager=printer_manager,
  168. db=db,
  169. archive_id=10,
  170. )
  171. assert len(results) == 1
  172. assert results[0]["spool_id"] == 1
  173. assert results[0]["weight_used"] == 20.0
  174. # Cost = 20g / 1000 * 25.0 = 0.50
  175. assert results[0]["cost"] == 0.50
  176. @pytest.mark.asyncio
  177. async def test_cost_with_default_fallback(self):
  178. """Cost uses default_filament_cost from settings when spool cost is None."""
  179. # Spool without cost_per_kg
  180. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  181. assignment = _make_assignment(spool_id=1)
  182. archive = _make_archive(archive_id=10)
  183. _active_sessions[1] = PrintSession(
  184. printer_id=1,
  185. print_name="Test",
  186. started_at=datetime.now(timezone.utc),
  187. tray_remain_start={(0, 0): 80},
  188. tray_now_at_start=0,
  189. )
  190. printer_manager = MagicMock()
  191. printer_manager.get_status.return_value = SimpleNamespace(
  192. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  193. progress=100,
  194. layer_num=50,
  195. tray_now=0,
  196. )
  197. # db returns: archive, queue_item(None), assignment, spool
  198. db = _mock_db_sequential([archive, None, assignment, spool])
  199. # 30g used from 3MF
  200. filament_usage = [{"slot_id": 1, "used_g": 30.0, "type": "PLA", "color": "#FF0000"}]
  201. with (
  202. patch("backend.app.core.config.settings") as mock_settings,
  203. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"), # default: 15.0/kg
  204. patch(
  205. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  206. return_value=filament_usage,
  207. ),
  208. ):
  209. mock_settings.base_dir = MagicMock()
  210. mock_path = MagicMock()
  211. mock_path.exists.return_value = True
  212. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  213. results = await on_print_complete(
  214. printer_id=1,
  215. data={"status": "completed"},
  216. printer_manager=printer_manager,
  217. db=db,
  218. archive_id=10,
  219. )
  220. assert len(results) == 1
  221. assert results[0]["spool_id"] == 1
  222. assert results[0]["weight_used"] == 30.0
  223. # Cost = 30g / 1000 * 15.0 = 0.45
  224. assert results[0]["cost"] == 0.45
  225. @pytest.mark.asyncio
  226. async def test_cost_zero_when_default_cost_is_zero(self):
  227. """Cost is None when both spool cost and default cost are 0."""
  228. # Spool without cost_per_kg
  229. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  230. assignment = _make_assignment(spool_id=1)
  231. archive = _make_archive(archive_id=10)
  232. _active_sessions[1] = PrintSession(
  233. printer_id=1,
  234. print_name="Test",
  235. started_at=datetime.now(timezone.utc),
  236. tray_remain_start={(0, 0): 80},
  237. tray_now_at_start=0,
  238. )
  239. printer_manager = MagicMock()
  240. printer_manager.get_status.return_value = SimpleNamespace(
  241. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  242. progress=100,
  243. layer_num=50,
  244. tray_now=0,
  245. )
  246. # db returns: archive, queue_item(None), assignment, spool
  247. db = _mock_db_sequential([archive, None, assignment, spool])
  248. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  249. with (
  250. patch("backend.app.core.config.settings") as mock_settings,
  251. patch("backend.app.api.routes.settings.get_setting", return_value="0.0"), # no default cost
  252. patch(
  253. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  254. return_value=filament_usage,
  255. ),
  256. ):
  257. mock_settings.base_dir = MagicMock()
  258. mock_path = MagicMock()
  259. mock_path.exists.return_value = True
  260. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  261. results = await on_print_complete(
  262. printer_id=1,
  263. data={"status": "completed"},
  264. printer_manager=printer_manager,
  265. db=db,
  266. archive_id=10,
  267. )
  268. assert len(results) == 1
  269. assert results[0]["cost"] is None
  270. @pytest.mark.asyncio
  271. async def test_cost_for_failed_print_uses_actual_usage(self):
  272. """Failed print at 50% progress calculates cost from actual usage."""
  273. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=20.0)
  274. assignment = _make_assignment(spool_id=1)
  275. archive = _make_archive(archive_id=10)
  276. _active_sessions[1] = PrintSession(
  277. printer_id=1,
  278. print_name="Test",
  279. started_at=datetime.now(timezone.utc),
  280. tray_remain_start={(0, 0): 80},
  281. tray_now_at_start=0,
  282. )
  283. # Failed at 50% progress
  284. printer_manager = MagicMock()
  285. printer_manager.get_status.return_value = SimpleNamespace(
  286. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  287. progress=50,
  288. layer_num=25,
  289. tray_now=0,
  290. )
  291. # db returns: archive, queue_item(None), assignment, spool
  292. db = _mock_db_sequential([archive, None, assignment, spool])
  293. # 40g total, but only 50% used
  294. filament_usage = [{"slot_id": 1, "used_g": 40.0, "type": "PLA", "color": "#FF0000"}]
  295. with (
  296. patch("backend.app.core.config.settings") as mock_settings,
  297. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  298. patch(
  299. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  300. return_value=filament_usage,
  301. ),
  302. patch(
  303. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  304. return_value=None, # No layer data, use linear scaling
  305. ),
  306. ):
  307. mock_settings.base_dir = MagicMock()
  308. mock_path = MagicMock()
  309. mock_path.exists.return_value = True
  310. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  311. results = await on_print_complete(
  312. printer_id=1,
  313. data={"status": "failed", "last_progress": 50.0},
  314. printer_manager=printer_manager,
  315. db=db,
  316. archive_id=10,
  317. )
  318. assert len(results) == 1
  319. # 50% of 40g = 20g
  320. assert results[0]["weight_used"] == 20.0
  321. # Cost = 20g / 1000 * 20.0 = 0.40
  322. assert results[0]["cost"] == 0.40
  323. @pytest.mark.asyncio
  324. async def test_cost_with_ams_fallback_tracking(self):
  325. """AMS fallback tracking also calculates cost correctly."""
  326. spool = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=30.0)
  327. assignment = _make_assignment(spool_id=2)
  328. _active_sessions[1] = PrintSession(
  329. printer_id=1,
  330. print_name="Test",
  331. started_at=datetime.now(timezone.utc),
  332. tray_remain_start={(0, 0): 80},
  333. )
  334. printer_manager = MagicMock()
  335. printer_manager.get_status.return_value = SimpleNamespace(
  336. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  337. tray_now=0,
  338. last_loaded_tray=-1,
  339. )
  340. # Pad 2 Nones for _find_3mf_by_filename DB queries (library + archive search),
  341. # then assignment and spool for the AMS fallback path
  342. db = _mock_db_sequential([None, None, assignment, spool])
  343. with patch("backend.app.api.routes.settings.get_setting", return_value="15.0"):
  344. results = await on_print_complete(
  345. printer_id=1,
  346. data={"status": "completed"},
  347. printer_manager=printer_manager,
  348. db=db,
  349. archive_id=None, # No archive = AMS fallback
  350. )
  351. assert len(results) == 1
  352. assert results[0]["spool_id"] == 2
  353. # 10% of 1000g = 100g
  354. assert results[0]["weight_used"] == 100.0
  355. # Cost = 100g / 1000 * 30.0 = 3.00
  356. assert results[0]["cost"] == 3.0
  357. @pytest.mark.asyncio
  358. async def test_multi_filament_cost_aggregation(self):
  359. """Multiple spools in one print have their costs tracked separately."""
  360. spool1 = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=20.0)
  361. spool2 = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=25.0)
  362. assignment1 = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  363. assignment2 = _make_assignment(spool_id=2, ams_id=0, tray_id=1)
  364. archive = _make_archive(archive_id=10)
  365. _active_sessions[1] = PrintSession(
  366. printer_id=1,
  367. print_name="Test",
  368. started_at=datetime.now(timezone.utc),
  369. tray_remain_start={(0, 0): 80, (0, 1): 90},
  370. tray_now_at_start=0,
  371. )
  372. printer_manager = MagicMock()
  373. printer_manager.get_status.return_value = SimpleNamespace(
  374. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}, {"id": 1, "remain": 80}]}]},
  375. progress=100,
  376. layer_num=50,
  377. tray_now=0,
  378. )
  379. # Mock slot-to-tray mapping: slot 1 -> tray 0, slot 2 -> tray 1
  380. ams_mapping = [0, 1]
  381. # db returns: archive, assignment1, spool1, assignment2, spool2
  382. # ams_mapping is provided, so no queue item lookup is performed
  383. db = _mock_db_sequential([archive, assignment1, spool1, assignment2, spool2])
  384. # Two filaments used
  385. filament_usage = [
  386. {"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"},
  387. {"slot_id": 2, "used_g": 25.0, "type": "PLA", "color": "#00FF00"},
  388. ]
  389. with (
  390. patch("backend.app.core.config.settings") as mock_settings,
  391. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  392. patch(
  393. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  394. return_value=filament_usage,
  395. ),
  396. ):
  397. mock_settings.base_dir = MagicMock()
  398. mock_path = MagicMock()
  399. mock_path.exists.return_value = True
  400. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  401. results = await on_print_complete(
  402. printer_id=1,
  403. data={"status": "completed"},
  404. printer_manager=printer_manager,
  405. db=db,
  406. archive_id=10,
  407. ams_mapping=ams_mapping,
  408. )
  409. assert len(results) == 2
  410. # First spool: 15g at 20/kg = 0.30
  411. spool1_result = next(r for r in results if r["spool_id"] == 1)
  412. assert spool1_result["weight_used"] == 15.0
  413. assert spool1_result["cost"] == 0.30
  414. # Second spool: 25g at 25/kg = 0.625, rounded to 0.62
  415. spool2_result = next(r for r in results if r["spool_id"] == 2)
  416. assert spool2_result["weight_used"] == 25.0
  417. assert spool2_result["cost"] == 0.62
  418. class TestCostAggregation:
  419. """Tests for cost aggregation to PrintArchive."""
  420. @pytest.mark.asyncio
  421. async def test_costs_summed_in_results(self):
  422. """Multiple spool costs are correctly summed from result dicts."""
  423. results = [
  424. {"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
  425. {"spool_id": 2, "weight_used": 30.0, "cost": 0.75},
  426. ]
  427. total_cost = sum(r.get("cost", 0) or 0 for r in results)
  428. assert total_cost == 1.25
  429. @pytest.mark.asyncio
  430. async def test_null_costs_handled_in_aggregation(self):
  431. """None costs don't break aggregation."""
  432. results = [
  433. {"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
  434. {"spool_id": 2, "weight_used": 30.0, "cost": None}, # No cost
  435. {"spool_id": 3, "weight_used": 10.0, "cost": 0.25},
  436. ]
  437. total_cost = sum(r.get("cost", 0) or 0 for r in results)
  438. assert total_cost == 0.75 # Only spools 1 and 3
  439. @pytest.mark.asyncio
  440. async def test_archive_cost_not_overwritten_with_zero(self):
  441. """archive.cost is preserved when no spool usage has cost data."""
  442. # Spool without cost_per_kg, default_filament_cost also 0 → cost=None per usage
  443. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  444. assignment = _make_assignment(spool_id=1)
  445. archive = _make_archive(archive_id=10)
  446. archive.cost = 5.00 # Pre-existing cost from catalog
  447. archive.print_name = "TestPrint"
  448. archive.printer_id = 1
  449. _active_sessions[1] = PrintSession(
  450. printer_id=1,
  451. print_name="TestPrint",
  452. started_at=datetime.now(timezone.utc),
  453. tray_remain_start={(0, 0): 80},
  454. tray_now_at_start=0,
  455. )
  456. printer_manager = MagicMock()
  457. printer_manager.get_status.return_value = SimpleNamespace(
  458. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  459. progress=100,
  460. layer_num=50,
  461. tray_now=0,
  462. )
  463. # Build mock db that returns proper scalars for the aggregation queries
  464. responses = []
  465. # 1. select(PrintArchive) → archive
  466. responses.append(("scalar_one_or_none", archive))
  467. # 2. select(PrintQueueItem) → None
  468. responses.append(("scalar_one_or_none", None))
  469. # 3. select(SpoolAssignment) → assignment
  470. responses.append(("scalar_one_or_none", assignment))
  471. # 4. select(Spool) → spool
  472. responses.append(("scalar_one_or_none", spool))
  473. # 5. cost aggregation: select archive to update cost
  474. responses.append(("scalar_one_or_none", archive))
  475. db = AsyncMock()
  476. call_count = [0]
  477. async def mock_execute(*args, **kwargs):
  478. idx = call_count[0]
  479. call_count[0] += 1
  480. result = MagicMock()
  481. if idx < len(responses):
  482. method, value = responses[idx]
  483. if method == "scalar":
  484. result.scalar.return_value = value
  485. result.scalar_one_or_none.return_value = value
  486. else:
  487. result.scalar_one_or_none.return_value = value
  488. result.scalar.return_value = value
  489. else:
  490. result.scalar_one_or_none.return_value = None
  491. result.scalar.return_value = None
  492. return result
  493. db.execute = mock_execute
  494. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  495. with (
  496. patch("backend.app.core.config.settings") as mock_settings,
  497. patch("backend.app.api.routes.settings.get_setting", return_value="0.0"), # no default cost
  498. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
  499. ):
  500. mock_settings.base_dir = MagicMock()
  501. mock_path = MagicMock()
  502. mock_path.exists.return_value = True
  503. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  504. results = await on_print_complete(
  505. printer_id=1,
  506. data={"status": "completed"},
  507. printer_manager=printer_manager,
  508. db=db,
  509. archive_id=10,
  510. )
  511. # Usage tracked but cost is None (no cost_per_kg, no default)
  512. assert len(results) == 1
  513. assert results[0]["cost"] is None
  514. # Archive cost should NOT have been overwritten — still 5.00
  515. assert archive.cost == 5.00
  516. @pytest.mark.asyncio
  517. async def test_archive_cost_set_when_spool_has_cost(self):
  518. """archive.cost is set from spool usage when cost data exists."""
  519. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  520. assignment = _make_assignment(spool_id=1)
  521. archive = _make_archive(archive_id=10)
  522. archive.cost = None # No pre-existing cost
  523. archive.print_name = "TestPrint"
  524. archive.printer_id = 1
  525. _active_sessions[1] = PrintSession(
  526. printer_id=1,
  527. print_name="TestPrint",
  528. started_at=datetime.now(timezone.utc),
  529. tray_remain_start={(0, 0): 80},
  530. tray_now_at_start=0,
  531. )
  532. printer_manager = MagicMock()
  533. printer_manager.get_status.return_value = SimpleNamespace(
  534. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  535. progress=100,
  536. layer_num=50,
  537. tray_now=0,
  538. )
  539. # 20g at 25/kg = 0.50
  540. expected_cost = 0.50
  541. responses = []
  542. responses.append(("scalar_one_or_none", archive))
  543. responses.append(("scalar_one_or_none", None)) # queue item
  544. responses.append(("scalar_one_or_none", assignment))
  545. responses.append(("scalar_one_or_none", spool))
  546. # cost aggregation: select archive to update cost
  547. responses.append(("scalar_one_or_none", archive))
  548. db = AsyncMock()
  549. call_count = [0]
  550. async def mock_execute(*args, **kwargs):
  551. idx = call_count[0]
  552. call_count[0] += 1
  553. result = MagicMock()
  554. if idx < len(responses):
  555. method, value = responses[idx]
  556. result.scalar.return_value = value
  557. result.scalar_one_or_none.return_value = value
  558. else:
  559. result.scalar_one_or_none.return_value = None
  560. result.scalar.return_value = None
  561. return result
  562. db.execute = mock_execute
  563. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  564. with (
  565. patch("backend.app.core.config.settings") as mock_settings,
  566. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  567. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
  568. ):
  569. mock_settings.base_dir = MagicMock()
  570. mock_path = MagicMock()
  571. mock_path.exists.return_value = True
  572. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  573. results = await on_print_complete(
  574. printer_id=1,
  575. data={"status": "completed"},
  576. printer_manager=printer_manager,
  577. db=db,
  578. archive_id=10,
  579. )
  580. assert len(results) == 1
  581. assert results[0]["cost"] == expected_cost
  582. # Archive cost should have been updated
  583. assert archive.cost == expected_cost
  584. @pytest.mark.asyncio
  585. async def test_archive_cost_includes_untracked_filament_at_default_rate(self):
  586. """#1344: when only some AMS trays have inventory spools, the untracked
  587. filament weight is charged at the global default rate so the total
  588. archive cost still reflects the whole print."""
  589. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=10.0)
  590. assignment = _make_assignment(spool_id=1)
  591. archive = _make_archive(archive_id=10)
  592. archive.cost = None
  593. archive.print_name = "TestPrint"
  594. archive.printer_id = 1
  595. archive.filament_used_grams = 110.0 # whole-print weight from slicer
  596. _active_sessions[1] = PrintSession(
  597. printer_id=1,
  598. print_name="TestPrint",
  599. started_at=datetime.now(timezone.utc),
  600. tray_remain_start={(0, 0): 80},
  601. tray_now_at_start=0,
  602. )
  603. printer_manager = MagicMock()
  604. printer_manager.get_status.return_value = SimpleNamespace(
  605. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  606. progress=100,
  607. layer_num=50,
  608. tray_now=0,
  609. )
  610. responses = [
  611. ("scalar_one_or_none", archive),
  612. ("scalar_one_or_none", None), # queue item
  613. ("scalar_one_or_none", assignment),
  614. ("scalar_one_or_none", spool),
  615. ("scalar_one_or_none", archive), # cost-update select
  616. ]
  617. db = AsyncMock()
  618. call_count = [0]
  619. async def mock_execute(*args, **kwargs):
  620. idx = call_count[0]
  621. call_count[0] += 1
  622. result = MagicMock()
  623. if idx < len(responses):
  624. _, value = responses[idx]
  625. result.scalar.return_value = value
  626. result.scalar_one_or_none.return_value = value
  627. else:
  628. result.scalar_one_or_none.return_value = None
  629. result.scalar.return_value = None
  630. return result
  631. db.execute = mock_execute
  632. # 3MF reports a single slot using 10g, but archive.filament_used_grams
  633. # says the whole print was 110g -- the other 100g came from spools that
  634. # aren't in inventory.
  635. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  636. with (
  637. patch("backend.app.core.config.settings") as mock_settings,
  638. patch("backend.app.api.routes.settings.get_setting", return_value="10.0"),
  639. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
  640. ):
  641. mock_settings.base_dir = MagicMock()
  642. mock_path = MagicMock()
  643. mock_path.exists.return_value = True
  644. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  645. results = await on_print_complete(
  646. printer_id=1,
  647. data={"status": "completed"},
  648. printer_manager=printer_manager,
  649. db=db,
  650. archive_id=10,
  651. )
  652. # Tracked slot: 10g * $10/kg = $0.10
  653. assert len(results) == 1
  654. assert results[0]["cost"] == 0.10
  655. # Untracked: 110g - 10g = 100g at $10/kg default = $1.00
  656. # Archive total: $0.10 + $1.00 = $1.10 (was $0.01 pre-fix because only
  657. # the tracked slot's tiny share was kept)
  658. assert archive.cost == 1.10
  659. @pytest.mark.asyncio
  660. async def test_archive_cost_fully_tracked_unchanged_by_topup(self):
  661. """When every gram is covered by inventory spools, the default-rate
  662. top-up adds nothing -- the archive cost is just the sum of tracked
  663. costs, same as before #1344."""
  664. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  665. assignment = _make_assignment(spool_id=1)
  666. archive = _make_archive(archive_id=10)
  667. archive.cost = None
  668. archive.print_name = "TestPrint"
  669. archive.printer_id = 1
  670. archive.filament_used_grams = 20.0 # exactly what the slot reports
  671. _active_sessions[1] = PrintSession(
  672. printer_id=1,
  673. print_name="TestPrint",
  674. started_at=datetime.now(timezone.utc),
  675. tray_remain_start={(0, 0): 80},
  676. tray_now_at_start=0,
  677. )
  678. printer_manager = MagicMock()
  679. printer_manager.get_status.return_value = SimpleNamespace(
  680. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  681. progress=100,
  682. layer_num=50,
  683. tray_now=0,
  684. )
  685. responses = [
  686. ("scalar_one_or_none", archive),
  687. ("scalar_one_or_none", None),
  688. ("scalar_one_or_none", assignment),
  689. ("scalar_one_or_none", spool),
  690. ("scalar_one_or_none", archive),
  691. ]
  692. db = AsyncMock()
  693. call_count = [0]
  694. async def mock_execute(*args, **kwargs):
  695. idx = call_count[0]
  696. call_count[0] += 1
  697. result = MagicMock()
  698. if idx < len(responses):
  699. _, value = responses[idx]
  700. result.scalar.return_value = value
  701. result.scalar_one_or_none.return_value = value
  702. else:
  703. result.scalar_one_or_none.return_value = None
  704. result.scalar.return_value = None
  705. return result
  706. db.execute = mock_execute
  707. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  708. with (
  709. patch("backend.app.core.config.settings") as mock_settings,
  710. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  711. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
  712. ):
  713. mock_settings.base_dir = MagicMock()
  714. mock_path = MagicMock()
  715. mock_path.exists.return_value = True
  716. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  717. results = await on_print_complete(
  718. printer_id=1,
  719. data={"status": "completed"},
  720. printer_manager=printer_manager,
  721. db=db,
  722. archive_id=10,
  723. )
  724. # 20g at $25/kg = $0.50 -- no top-up because tracked >= archive grams
  725. assert len(results) == 1
  726. assert results[0]["cost"] == 0.50
  727. assert archive.cost == 0.50
  728. @pytest.mark.asyncio
  729. async def test_cost_with_archive_id(self):
  730. """Test cost aggregation using archive_id (3MF path)."""
  731. spool_new = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  732. assignment_new = _make_assignment(spool_id=1)
  733. archive_new = _make_archive(archive_id=20)
  734. filament_usage_new = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  735. printer_manager = MagicMock()
  736. printer_manager.get_status.return_value = SimpleNamespace(
  737. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  738. progress=100,
  739. layer_num=50,
  740. tray_now=0,
  741. )
  742. db = _mock_db_sequential([archive_new, None, assignment_new, spool_new])
  743. with (
  744. patch("backend.app.core.config.settings") as mock_settings,
  745. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  746. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage_new),
  747. ):
  748. mock_settings.base_dir = MagicMock()
  749. mock_path = MagicMock()
  750. mock_path.exists.return_value = True
  751. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  752. results_new = await on_print_complete(
  753. printer_id=1,
  754. data={"status": "completed"},
  755. printer_manager=printer_manager,
  756. db=db,
  757. archive_id=20,
  758. )
  759. assert len(results_new) == 1
  760. assert results_new[0]["spool_id"] == 1
  761. assert results_new[0]["cost"] == 0.50 # 20g / 1000 * 25.0
  762. @pytest.mark.asyncio
  763. async def test_cost_with_print_name_ams_fallback(self):
  764. """Test cost aggregation using print_name (AMS fallback, legacy path)."""
  765. spool_old = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=15.0)
  766. assignment_old = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  767. legacy_print_name = "LegacyPrint"
  768. _active_sessions[1] = PrintSession(
  769. printer_id=1,
  770. print_name=legacy_print_name,
  771. started_at=datetime.now(timezone.utc),
  772. tray_remain_start={(0, 0): 80},
  773. tray_now_at_start=0,
  774. )
  775. printer_manager = MagicMock()
  776. printer_manager.get_status.return_value = SimpleNamespace(
  777. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  778. progress=100,
  779. layer_num=50,
  780. tray_now=0,
  781. )
  782. # Pad 2 Nones for _find_3mf_by_filename DB queries (library + archive search),
  783. # then assignment and spool for the AMS fallback path
  784. db = _mock_db_sequential([None, None, assignment_old, spool_old])
  785. with (
  786. patch("backend.app.core.config.settings") as mock_settings,
  787. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  788. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=None),
  789. ):
  790. mock_settings.base_dir = MagicMock()
  791. mock_path = MagicMock()
  792. mock_path.exists.return_value = True
  793. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  794. results_old = await on_print_complete(
  795. printer_id=1,
  796. data={"status": "completed", "subtask_name": legacy_print_name, "filename": legacy_print_name},
  797. printer_manager=printer_manager,
  798. db=db,
  799. archive_id=None,
  800. )
  801. assert len(results_old) == 1
  802. assert results_old[0]["spool_id"] == 2
  803. assert results_old[0]["cost"] == 1.5 # 100g / 1000 * 15.0