test_cost_tracking.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779
  1. """Unit tests for cost tracking in usage_tracker.py.
  2. Tests cost calculation scenarios:
  3. - Spool-specific cost_per_kg
  4. - Default fallback cost from settings
  5. - Spools without cost (None)
  6. - Completed prints
  7. - Failed/partial prints
  8. - Cost aggregation to archives
  9. """
  10. import os
  11. import tempfile
  12. from datetime import datetime, timezone
  13. from types import SimpleNamespace
  14. from unittest.mock import AsyncMock, MagicMock, patch
  15. import pytest
  16. from backend.app.services.usage_tracker import (
  17. PrintSession,
  18. _active_sessions,
  19. _track_from_3mf,
  20. on_print_complete,
  21. )
  22. def _make_spool(spool_id=1, label_weight=1000, weight_used=0, cost_per_kg=None):
  23. """Create a mock Spool object with cost fields."""
  24. spool = MagicMock()
  25. spool.id = spool_id
  26. spool.label_weight = label_weight
  27. spool.weight_used = weight_used
  28. spool.cost_per_kg = cost_per_kg
  29. spool.last_used = None
  30. spool.material = "PLA"
  31. return spool
  32. def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
  33. """Create a mock SpoolAssignment object."""
  34. assignment = MagicMock()
  35. assignment.spool_id = spool_id
  36. assignment.printer_id = printer_id
  37. assignment.ams_id = ams_id
  38. assignment.tray_id = tray_id
  39. return assignment
  40. def _make_archive(archive_id=1, file_path=None):
  41. """Create a mock PrintArchive object with a temp file, and register cleanup."""
  42. if file_path is None:
  43. with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf", prefix="test_print_") as tmp:
  44. file_path = tmp.name
  45. # Register cleanup for this file after the test
  46. import pytest
  47. frame = None
  48. try:
  49. raise Exception
  50. except Exception:
  51. import sys
  52. frame = sys._getframe(1)
  53. request = frame.f_locals.get("request")
  54. if request is not None:
  55. def cleanup():
  56. try:
  57. os.remove(file_path)
  58. except Exception:
  59. pass
  60. request.addfinalizer(cleanup)
  61. archive = MagicMock()
  62. archive.id = archive_id
  63. archive.file_path = file_path
  64. return archive
  65. @pytest.fixture(autouse=True)
  66. def cleanup_temp_archives():
  67. yield
  68. # Cleanup any temp .3mf files created by _make_archive
  69. import glob
  70. for f in glob.glob("test_print_*.3mf"):
  71. try:
  72. os.remove(f)
  73. except Exception:
  74. pass
  75. @pytest.fixture(autouse=True)
  76. def cleanup_test_print_gcode():
  77. yield
  78. import os
  79. path = "archives/test/test_print.gcode.3mf"
  80. if os.path.exists(path):
  81. try:
  82. os.remove(path)
  83. except Exception:
  84. pass
  85. @pytest.fixture
  86. def archive_factory_temp():
  87. import tempfile
  88. def _factory(*args, **kwargs):
  89. with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf", prefix="test_print_", dir="archives/test") as tmp:
  90. kwargs["file_path"] = tmp.name
  91. return kwargs["file_path"]
  92. yield _factory
  93. # Cleanup
  94. import glob
  95. import os
  96. for f in glob.glob("archives/test/test_print_*.3mf"):
  97. try:
  98. os.remove(f)
  99. except Exception:
  100. pass
  101. def _mock_db_sequential(responses):
  102. """Create mock db that returns responses in order."""
  103. db = AsyncMock()
  104. call_count = [0]
  105. async def mock_execute(*args, **kwargs):
  106. idx = call_count[0]
  107. call_count[0] += 1
  108. result = MagicMock()
  109. if idx < len(responses):
  110. result.scalar_one_or_none.return_value = responses[idx]
  111. else:
  112. result.scalar_one_or_none.return_value = None
  113. return result
  114. db.execute = mock_execute
  115. return db
  116. class TestCostCalculation:
  117. """Tests for cost calculation in usage tracking."""
  118. @pytest.fixture(autouse=True)
  119. def _clear_sessions(self):
  120. _active_sessions.clear()
  121. yield
  122. _active_sessions.clear()
  123. @pytest.mark.asyncio
  124. async def test_cost_with_spool_specific_cost_per_kg(self):
  125. """Cost is calculated using spool-specific cost_per_kg when available."""
  126. # Spool with cost_per_kg = 25.00 USD/kg
  127. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  128. assignment = _make_assignment(spool_id=1)
  129. archive = _make_archive(archive_id=10)
  130. _active_sessions[1] = PrintSession(
  131. printer_id=1,
  132. print_name="Test",
  133. started_at=datetime.now(timezone.utc),
  134. tray_remain_start={(0, 0): 80},
  135. tray_now_at_start=0,
  136. )
  137. printer_manager = MagicMock()
  138. printer_manager.get_status.return_value = SimpleNamespace(
  139. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  140. progress=100,
  141. layer_num=50,
  142. tray_now=0,
  143. )
  144. # db returns: archive, queue_item(None), assignment, spool
  145. db = _mock_db_sequential([archive, None, assignment, spool])
  146. # 20g used from 3MF
  147. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  148. with (
  149. patch("backend.app.core.config.settings") as mock_settings,
  150. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"), # default cost
  151. patch(
  152. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  153. return_value=filament_usage,
  154. ),
  155. ):
  156. mock_settings.base_dir = MagicMock()
  157. mock_path = MagicMock()
  158. mock_path.exists.return_value = True
  159. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  160. results = await on_print_complete(
  161. printer_id=1,
  162. data={"status": "completed"},
  163. printer_manager=printer_manager,
  164. db=db,
  165. archive_id=10,
  166. )
  167. assert len(results) == 1
  168. assert results[0]["spool_id"] == 1
  169. assert results[0]["weight_used"] == 20.0
  170. # Cost = 20g / 1000 * 25.0 = 0.50
  171. assert results[0]["cost"] == 0.50
  172. @pytest.mark.asyncio
  173. async def test_cost_with_default_fallback(self):
  174. """Cost uses default_filament_cost from settings when spool cost is None."""
  175. # Spool without cost_per_kg
  176. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  177. assignment = _make_assignment(spool_id=1)
  178. archive = _make_archive(archive_id=10)
  179. _active_sessions[1] = PrintSession(
  180. printer_id=1,
  181. print_name="Test",
  182. started_at=datetime.now(timezone.utc),
  183. tray_remain_start={(0, 0): 80},
  184. tray_now_at_start=0,
  185. )
  186. printer_manager = MagicMock()
  187. printer_manager.get_status.return_value = SimpleNamespace(
  188. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  189. progress=100,
  190. layer_num=50,
  191. tray_now=0,
  192. )
  193. # db returns: archive, queue_item(None), assignment, spool
  194. db = _mock_db_sequential([archive, None, assignment, spool])
  195. # 30g used from 3MF
  196. filament_usage = [{"slot_id": 1, "used_g": 30.0, "type": "PLA", "color": "#FF0000"}]
  197. with (
  198. patch("backend.app.core.config.settings") as mock_settings,
  199. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"), # default: 15.0/kg
  200. patch(
  201. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  202. return_value=filament_usage,
  203. ),
  204. ):
  205. mock_settings.base_dir = MagicMock()
  206. mock_path = MagicMock()
  207. mock_path.exists.return_value = True
  208. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  209. results = await on_print_complete(
  210. printer_id=1,
  211. data={"status": "completed"},
  212. printer_manager=printer_manager,
  213. db=db,
  214. archive_id=10,
  215. )
  216. assert len(results) == 1
  217. assert results[0]["spool_id"] == 1
  218. assert results[0]["weight_used"] == 30.0
  219. # Cost = 30g / 1000 * 15.0 = 0.45
  220. assert results[0]["cost"] == 0.45
  221. @pytest.mark.asyncio
  222. async def test_cost_zero_when_default_cost_is_zero(self):
  223. """Cost is None when both spool cost and default cost are 0."""
  224. # Spool without cost_per_kg
  225. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  226. assignment = _make_assignment(spool_id=1)
  227. archive = _make_archive(archive_id=10)
  228. _active_sessions[1] = PrintSession(
  229. printer_id=1,
  230. print_name="Test",
  231. started_at=datetime.now(timezone.utc),
  232. tray_remain_start={(0, 0): 80},
  233. tray_now_at_start=0,
  234. )
  235. printer_manager = MagicMock()
  236. printer_manager.get_status.return_value = SimpleNamespace(
  237. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  238. progress=100,
  239. layer_num=50,
  240. tray_now=0,
  241. )
  242. # db returns: archive, queue_item(None), assignment, spool
  243. db = _mock_db_sequential([archive, None, assignment, spool])
  244. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  245. with (
  246. patch("backend.app.core.config.settings") as mock_settings,
  247. patch("backend.app.api.routes.settings.get_setting", return_value="0.0"), # no default cost
  248. patch(
  249. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  250. return_value=filament_usage,
  251. ),
  252. ):
  253. mock_settings.base_dir = MagicMock()
  254. mock_path = MagicMock()
  255. mock_path.exists.return_value = True
  256. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  257. results = await on_print_complete(
  258. printer_id=1,
  259. data={"status": "completed"},
  260. printer_manager=printer_manager,
  261. db=db,
  262. archive_id=10,
  263. )
  264. assert len(results) == 1
  265. assert results[0]["cost"] is None
  266. @pytest.mark.asyncio
  267. async def test_cost_for_failed_print_uses_actual_usage(self):
  268. """Failed print at 50% progress calculates cost from actual usage."""
  269. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=20.0)
  270. assignment = _make_assignment(spool_id=1)
  271. archive = _make_archive(archive_id=10)
  272. _active_sessions[1] = PrintSession(
  273. printer_id=1,
  274. print_name="Test",
  275. started_at=datetime.now(timezone.utc),
  276. tray_remain_start={(0, 0): 80},
  277. tray_now_at_start=0,
  278. )
  279. # Failed at 50% progress
  280. printer_manager = MagicMock()
  281. printer_manager.get_status.return_value = SimpleNamespace(
  282. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  283. progress=50,
  284. layer_num=25,
  285. tray_now=0,
  286. )
  287. # db returns: archive, queue_item(None), assignment, spool
  288. db = _mock_db_sequential([archive, None, assignment, spool])
  289. # 40g total, but only 50% used
  290. filament_usage = [{"slot_id": 1, "used_g": 40.0, "type": "PLA", "color": "#FF0000"}]
  291. with (
  292. patch("backend.app.core.config.settings") as mock_settings,
  293. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  294. patch(
  295. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  296. return_value=filament_usage,
  297. ),
  298. patch(
  299. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  300. return_value=None, # No layer data, use linear scaling
  301. ),
  302. ):
  303. mock_settings.base_dir = MagicMock()
  304. mock_path = MagicMock()
  305. mock_path.exists.return_value = True
  306. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  307. results = await on_print_complete(
  308. printer_id=1,
  309. data={"status": "failed", "last_progress": 50.0},
  310. printer_manager=printer_manager,
  311. db=db,
  312. archive_id=10,
  313. )
  314. assert len(results) == 1
  315. # 50% of 40g = 20g
  316. assert results[0]["weight_used"] == 20.0
  317. # Cost = 20g / 1000 * 20.0 = 0.40
  318. assert results[0]["cost"] == 0.40
  319. @pytest.mark.asyncio
  320. async def test_cost_with_ams_fallback_tracking(self):
  321. """AMS fallback tracking also calculates cost correctly."""
  322. spool = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=30.0)
  323. assignment = _make_assignment(spool_id=2)
  324. _active_sessions[1] = PrintSession(
  325. printer_id=1,
  326. print_name="Test",
  327. started_at=datetime.now(timezone.utc),
  328. tray_remain_start={(0, 0): 80},
  329. )
  330. printer_manager = MagicMock()
  331. printer_manager.get_status.return_value = SimpleNamespace(
  332. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  333. tray_now=0,
  334. last_loaded_tray=-1,
  335. )
  336. # Pad 2 Nones for _find_3mf_by_filename DB queries (library + archive search),
  337. # then assignment and spool for the AMS fallback path
  338. db = _mock_db_sequential([None, None, assignment, spool])
  339. with patch("backend.app.api.routes.settings.get_setting", return_value="15.0"):
  340. results = await on_print_complete(
  341. printer_id=1,
  342. data={"status": "completed"},
  343. printer_manager=printer_manager,
  344. db=db,
  345. archive_id=None, # No archive = AMS fallback
  346. )
  347. assert len(results) == 1
  348. assert results[0]["spool_id"] == 2
  349. # 10% of 1000g = 100g
  350. assert results[0]["weight_used"] == 100.0
  351. # Cost = 100g / 1000 * 30.0 = 3.00
  352. assert results[0]["cost"] == 3.0
  353. @pytest.mark.asyncio
  354. async def test_multi_filament_cost_aggregation(self):
  355. """Multiple spools in one print have their costs tracked separately."""
  356. spool1 = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=20.0)
  357. spool2 = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=25.0)
  358. assignment1 = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  359. assignment2 = _make_assignment(spool_id=2, ams_id=0, tray_id=1)
  360. archive = _make_archive(archive_id=10)
  361. _active_sessions[1] = PrintSession(
  362. printer_id=1,
  363. print_name="Test",
  364. started_at=datetime.now(timezone.utc),
  365. tray_remain_start={(0, 0): 80, (0, 1): 90},
  366. tray_now_at_start=0,
  367. )
  368. printer_manager = MagicMock()
  369. printer_manager.get_status.return_value = SimpleNamespace(
  370. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}, {"id": 1, "remain": 80}]}]},
  371. progress=100,
  372. layer_num=50,
  373. tray_now=0,
  374. )
  375. # Mock slot-to-tray mapping: slot 1 -> tray 0, slot 2 -> tray 1
  376. ams_mapping = [0, 1]
  377. # db returns: archive, assignment1, spool1, assignment2, spool2
  378. # ams_mapping is provided, so no queue item lookup is performed
  379. db = _mock_db_sequential([archive, assignment1, spool1, assignment2, spool2])
  380. # Two filaments used
  381. filament_usage = [
  382. {"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"},
  383. {"slot_id": 2, "used_g": 25.0, "type": "PLA", "color": "#00FF00"},
  384. ]
  385. with (
  386. patch("backend.app.core.config.settings") as mock_settings,
  387. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  388. patch(
  389. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  390. return_value=filament_usage,
  391. ),
  392. ):
  393. mock_settings.base_dir = MagicMock()
  394. mock_path = MagicMock()
  395. mock_path.exists.return_value = True
  396. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  397. results = await on_print_complete(
  398. printer_id=1,
  399. data={"status": "completed"},
  400. printer_manager=printer_manager,
  401. db=db,
  402. archive_id=10,
  403. ams_mapping=ams_mapping,
  404. )
  405. assert len(results) == 2
  406. # First spool: 15g at 20/kg = 0.30
  407. spool1_result = next(r for r in results if r["spool_id"] == 1)
  408. assert spool1_result["weight_used"] == 15.0
  409. assert spool1_result["cost"] == 0.30
  410. # Second spool: 25g at 25/kg = 0.625, rounded to 0.62
  411. spool2_result = next(r for r in results if r["spool_id"] == 2)
  412. assert spool2_result["weight_used"] == 25.0
  413. assert spool2_result["cost"] == 0.62
  414. class TestCostAggregation:
  415. """Tests for cost aggregation to PrintArchive."""
  416. @pytest.mark.asyncio
  417. async def test_costs_summed_in_results(self):
  418. """Multiple spool costs are correctly summed from result dicts."""
  419. results = [
  420. {"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
  421. {"spool_id": 2, "weight_used": 30.0, "cost": 0.75},
  422. ]
  423. total_cost = sum(r.get("cost", 0) or 0 for r in results)
  424. assert total_cost == 1.25
  425. @pytest.mark.asyncio
  426. async def test_null_costs_handled_in_aggregation(self):
  427. """None costs don't break aggregation."""
  428. results = [
  429. {"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
  430. {"spool_id": 2, "weight_used": 30.0, "cost": None}, # No cost
  431. {"spool_id": 3, "weight_used": 10.0, "cost": 0.25},
  432. ]
  433. total_cost = sum(r.get("cost", 0) or 0 for r in results)
  434. assert total_cost == 0.75 # Only spools 1 and 3
  435. @pytest.mark.asyncio
  436. async def test_archive_cost_not_overwritten_with_zero(self):
  437. """archive.cost is preserved when no spool usage has cost data."""
  438. # Spool without cost_per_kg, default_filament_cost also 0 → cost=None per usage
  439. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  440. assignment = _make_assignment(spool_id=1)
  441. archive = _make_archive(archive_id=10)
  442. archive.cost = 5.00 # Pre-existing cost from catalog
  443. archive.print_name = "TestPrint"
  444. archive.printer_id = 1
  445. _active_sessions[1] = PrintSession(
  446. printer_id=1,
  447. print_name="TestPrint",
  448. started_at=datetime.now(timezone.utc),
  449. tray_remain_start={(0, 0): 80},
  450. tray_now_at_start=0,
  451. )
  452. printer_manager = MagicMock()
  453. printer_manager.get_status.return_value = SimpleNamespace(
  454. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  455. progress=100,
  456. layer_num=50,
  457. tray_now=0,
  458. )
  459. # Build mock db that returns proper scalars for the aggregation queries
  460. responses = []
  461. # 1. select(PrintArchive) → archive
  462. responses.append(("scalar_one_or_none", archive))
  463. # 2. select(PrintQueueItem) → None
  464. responses.append(("scalar_one_or_none", None))
  465. # 3. select(SpoolAssignment) → assignment
  466. responses.append(("scalar_one_or_none", assignment))
  467. # 4. select(Spool) → spool
  468. responses.append(("scalar_one_or_none", spool))
  469. # 5. cost aggregation: select archive to update cost
  470. responses.append(("scalar_one_or_none", archive))
  471. db = AsyncMock()
  472. call_count = [0]
  473. async def mock_execute(*args, **kwargs):
  474. idx = call_count[0]
  475. call_count[0] += 1
  476. result = MagicMock()
  477. if idx < len(responses):
  478. method, value = responses[idx]
  479. if method == "scalar":
  480. result.scalar.return_value = value
  481. result.scalar_one_or_none.return_value = value
  482. else:
  483. result.scalar_one_or_none.return_value = value
  484. result.scalar.return_value = value
  485. else:
  486. result.scalar_one_or_none.return_value = None
  487. result.scalar.return_value = None
  488. return result
  489. db.execute = mock_execute
  490. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  491. with (
  492. patch("backend.app.core.config.settings") as mock_settings,
  493. patch("backend.app.api.routes.settings.get_setting", return_value="0.0"), # no default cost
  494. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
  495. ):
  496. mock_settings.base_dir = MagicMock()
  497. mock_path = MagicMock()
  498. mock_path.exists.return_value = True
  499. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  500. results = await on_print_complete(
  501. printer_id=1,
  502. data={"status": "completed"},
  503. printer_manager=printer_manager,
  504. db=db,
  505. archive_id=10,
  506. )
  507. # Usage tracked but cost is None (no cost_per_kg, no default)
  508. assert len(results) == 1
  509. assert results[0]["cost"] is None
  510. # Archive cost should NOT have been overwritten — still 5.00
  511. assert archive.cost == 5.00
  512. @pytest.mark.asyncio
  513. async def test_archive_cost_set_when_spool_has_cost(self):
  514. """archive.cost is set from spool usage when cost data exists."""
  515. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  516. assignment = _make_assignment(spool_id=1)
  517. archive = _make_archive(archive_id=10)
  518. archive.cost = None # No pre-existing cost
  519. archive.print_name = "TestPrint"
  520. archive.printer_id = 1
  521. _active_sessions[1] = PrintSession(
  522. printer_id=1,
  523. print_name="TestPrint",
  524. started_at=datetime.now(timezone.utc),
  525. tray_remain_start={(0, 0): 80},
  526. tray_now_at_start=0,
  527. )
  528. printer_manager = MagicMock()
  529. printer_manager.get_status.return_value = SimpleNamespace(
  530. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  531. progress=100,
  532. layer_num=50,
  533. tray_now=0,
  534. )
  535. # 20g at 25/kg = 0.50
  536. expected_cost = 0.50
  537. responses = []
  538. responses.append(("scalar_one_or_none", archive))
  539. responses.append(("scalar_one_or_none", None)) # queue item
  540. responses.append(("scalar_one_or_none", assignment))
  541. responses.append(("scalar_one_or_none", spool))
  542. # cost aggregation: select archive to update cost
  543. responses.append(("scalar_one_or_none", archive))
  544. db = AsyncMock()
  545. call_count = [0]
  546. async def mock_execute(*args, **kwargs):
  547. idx = call_count[0]
  548. call_count[0] += 1
  549. result = MagicMock()
  550. if idx < len(responses):
  551. method, value = responses[idx]
  552. result.scalar.return_value = value
  553. result.scalar_one_or_none.return_value = value
  554. else:
  555. result.scalar_one_or_none.return_value = None
  556. result.scalar.return_value = None
  557. return result
  558. db.execute = mock_execute
  559. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  560. with (
  561. patch("backend.app.core.config.settings") as mock_settings,
  562. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  563. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
  564. ):
  565. mock_settings.base_dir = MagicMock()
  566. mock_path = MagicMock()
  567. mock_path.exists.return_value = True
  568. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  569. results = await on_print_complete(
  570. printer_id=1,
  571. data={"status": "completed"},
  572. printer_manager=printer_manager,
  573. db=db,
  574. archive_id=10,
  575. )
  576. assert len(results) == 1
  577. assert results[0]["cost"] == expected_cost
  578. # Archive cost should have been updated
  579. assert archive.cost == expected_cost
  580. @pytest.mark.asyncio
  581. async def test_cost_with_archive_id(self):
  582. """Test cost aggregation using archive_id (3MF path)."""
  583. spool_new = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  584. assignment_new = _make_assignment(spool_id=1)
  585. archive_new = _make_archive(archive_id=20)
  586. filament_usage_new = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  587. printer_manager = MagicMock()
  588. printer_manager.get_status.return_value = SimpleNamespace(
  589. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  590. progress=100,
  591. layer_num=50,
  592. tray_now=0,
  593. )
  594. db = _mock_db_sequential([archive_new, None, assignment_new, spool_new])
  595. with (
  596. patch("backend.app.core.config.settings") as mock_settings,
  597. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  598. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage_new),
  599. ):
  600. mock_settings.base_dir = MagicMock()
  601. mock_path = MagicMock()
  602. mock_path.exists.return_value = True
  603. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  604. results_new = await on_print_complete(
  605. printer_id=1,
  606. data={"status": "completed"},
  607. printer_manager=printer_manager,
  608. db=db,
  609. archive_id=20,
  610. )
  611. assert len(results_new) == 1
  612. assert results_new[0]["spool_id"] == 1
  613. assert results_new[0]["cost"] == 0.50 # 20g / 1000 * 25.0
  614. @pytest.mark.asyncio
  615. async def test_cost_with_print_name_ams_fallback(self):
  616. """Test cost aggregation using print_name (AMS fallback, legacy path)."""
  617. spool_old = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=15.0)
  618. assignment_old = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  619. legacy_print_name = "LegacyPrint"
  620. _active_sessions[1] = PrintSession(
  621. printer_id=1,
  622. print_name=legacy_print_name,
  623. started_at=datetime.now(timezone.utc),
  624. tray_remain_start={(0, 0): 80},
  625. tray_now_at_start=0,
  626. )
  627. printer_manager = MagicMock()
  628. printer_manager.get_status.return_value = SimpleNamespace(
  629. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  630. progress=100,
  631. layer_num=50,
  632. tray_now=0,
  633. )
  634. # Pad 2 Nones for _find_3mf_by_filename DB queries (library + archive search),
  635. # then assignment and spool for the AMS fallback path
  636. db = _mock_db_sequential([None, None, assignment_old, spool_old])
  637. with (
  638. patch("backend.app.core.config.settings") as mock_settings,
  639. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  640. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=None),
  641. ):
  642. mock_settings.base_dir = MagicMock()
  643. mock_path = MagicMock()
  644. mock_path.exists.return_value = True
  645. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  646. results_old = await on_print_complete(
  647. printer_id=1,
  648. data={"status": "completed", "subtask_name": legacy_print_name, "filename": legacy_print_name},
  649. printer_manager=printer_manager,
  650. db=db,
  651. archive_id=None,
  652. )
  653. assert len(results_old) == 1
  654. assert results_old[0]["spool_id"] == 2
  655. assert results_old[0]["cost"] == 1.5 # 100g / 1000 * 15.0