test_cost_tracking.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782
  1. """Unit tests for cost tracking in usage_tracker.py.
  2. Tests cost calculation scenarios:
  3. - Spool-specific cost_per_kg
  4. - Default fallback cost from settings
  5. - Spools without cost (None)
  6. - Completed prints
  7. - Failed/partial prints
  8. - Cost aggregation to archives
  9. """
  10. import os
  11. import tempfile
  12. from datetime import datetime, timezone
  13. from types import SimpleNamespace
  14. from unittest.mock import AsyncMock, MagicMock, patch
  15. import pytest
  16. from backend.app.services.usage_tracker import (
  17. PrintSession,
  18. _active_sessions,
  19. _track_from_3mf,
  20. on_print_complete,
  21. )
  22. def _make_spool(spool_id=1, label_weight=1000, weight_used=0, cost_per_kg=None):
  23. """Create a mock Spool object with cost fields."""
  24. spool = MagicMock()
  25. spool.id = spool_id
  26. spool.label_weight = label_weight
  27. spool.weight_used = weight_used
  28. spool.cost_per_kg = cost_per_kg
  29. spool.last_used = None
  30. spool.material = "PLA"
  31. return spool
  32. def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
  33. """Create a mock SpoolAssignment object."""
  34. assignment = MagicMock()
  35. assignment.spool_id = spool_id
  36. assignment.printer_id = printer_id
  37. assignment.ams_id = ams_id
  38. assignment.tray_id = tray_id
  39. return assignment
  40. def _make_archive(archive_id=1, file_path=None):
  41. """Create a mock PrintArchive object with a temp file, and register cleanup."""
  42. if file_path is None:
  43. with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf", prefix="test_print_") as tmp:
  44. file_path = tmp.name
  45. # Register cleanup for this file after the test
  46. import pytest
  47. frame = None
  48. try:
  49. raise Exception
  50. except Exception:
  51. import sys
  52. frame = sys._getframe(1)
  53. request = frame.f_locals.get("request")
  54. if request is not None:
  55. def cleanup():
  56. try:
  57. os.remove(file_path)
  58. except Exception:
  59. pass
  60. request.addfinalizer(cleanup)
  61. archive = MagicMock()
  62. archive.id = archive_id
  63. archive.file_path = file_path
  64. return archive
  65. @pytest.fixture(autouse=True)
  66. def cleanup_temp_archives():
  67. yield
  68. # Cleanup any temp .3mf files created by _make_archive
  69. import glob
  70. for f in glob.glob("test_print_*.3mf"):
  71. try:
  72. os.remove(f)
  73. except Exception:
  74. pass
  75. @pytest.fixture(autouse=True)
  76. def cleanup_test_print_gcode():
  77. yield
  78. import os
  79. path = "archives/test/test_print.gcode.3mf"
  80. if os.path.exists(path):
  81. try:
  82. os.remove(path)
  83. except Exception:
  84. pass
  85. @pytest.fixture
  86. def archive_factory_temp():
  87. import tempfile
  88. def _factory(*args, **kwargs):
  89. with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf", prefix="test_print_", dir="archives/test") as tmp:
  90. kwargs["file_path"] = tmp.name
  91. return kwargs["file_path"]
  92. yield _factory
  93. # Cleanup
  94. import glob
  95. import os
  96. for f in glob.glob("archives/test/test_print_*.3mf"):
  97. try:
  98. os.remove(f)
  99. except Exception:
  100. pass
  101. def _mock_db_sequential(responses):
  102. """Create mock db that returns responses in order."""
  103. db = AsyncMock()
  104. call_count = [0]
  105. async def mock_execute(*args, **kwargs):
  106. idx = call_count[0]
  107. call_count[0] += 1
  108. result = MagicMock()
  109. if idx < len(responses):
  110. result.scalar_one_or_none.return_value = responses[idx]
  111. else:
  112. result.scalar_one_or_none.return_value = None
  113. return result
  114. db.execute = mock_execute
  115. return db
  116. class TestCostCalculation:
  117. """Tests for cost calculation in usage tracking."""
  118. @pytest.fixture(autouse=True)
  119. def _clear_sessions(self):
  120. _active_sessions.clear()
  121. yield
  122. _active_sessions.clear()
  123. @pytest.mark.asyncio
  124. async def test_cost_with_spool_specific_cost_per_kg(self):
  125. """Cost is calculated using spool-specific cost_per_kg when available."""
  126. # Spool with cost_per_kg = 25.00 USD/kg
  127. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  128. assignment = _make_assignment(spool_id=1)
  129. archive = _make_archive(archive_id=10)
  130. _active_sessions[1] = PrintSession(
  131. printer_id=1,
  132. print_name="Test",
  133. started_at=datetime.now(timezone.utc),
  134. tray_remain_start={(0, 0): 80},
  135. tray_now_at_start=0,
  136. )
  137. printer_manager = MagicMock()
  138. printer_manager.get_status.return_value = SimpleNamespace(
  139. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  140. progress=100,
  141. layer_num=50,
  142. tray_now=0,
  143. )
  144. # db returns: archive, queue_item(None), assignment, spool
  145. db = _mock_db_sequential([archive, None, assignment, spool])
  146. # 20g used from 3MF
  147. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  148. with (
  149. patch("backend.app.core.config.settings") as mock_settings,
  150. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"), # default cost
  151. patch(
  152. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  153. return_value=filament_usage,
  154. ),
  155. ):
  156. mock_settings.base_dir = MagicMock()
  157. mock_path = MagicMock()
  158. mock_path.exists.return_value = True
  159. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  160. results = await on_print_complete(
  161. printer_id=1,
  162. data={"status": "completed"},
  163. printer_manager=printer_manager,
  164. db=db,
  165. archive_id=10,
  166. )
  167. assert len(results) == 1
  168. assert results[0]["spool_id"] == 1
  169. assert results[0]["weight_used"] == 20.0
  170. # Cost = 20g / 1000 * 25.0 = 0.50
  171. assert results[0]["cost"] == 0.50
  172. @pytest.mark.asyncio
  173. async def test_cost_with_default_fallback(self):
  174. """Cost uses default_filament_cost from settings when spool cost is None."""
  175. # Spool without cost_per_kg
  176. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  177. assignment = _make_assignment(spool_id=1)
  178. archive = _make_archive(archive_id=10)
  179. _active_sessions[1] = PrintSession(
  180. printer_id=1,
  181. print_name="Test",
  182. started_at=datetime.now(timezone.utc),
  183. tray_remain_start={(0, 0): 80},
  184. tray_now_at_start=0,
  185. )
  186. printer_manager = MagicMock()
  187. printer_manager.get_status.return_value = SimpleNamespace(
  188. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  189. progress=100,
  190. layer_num=50,
  191. tray_now=0,
  192. )
  193. # db returns: archive, queue_item(None), assignment, spool
  194. db = _mock_db_sequential([archive, None, assignment, spool])
  195. # 30g used from 3MF
  196. filament_usage = [{"slot_id": 1, "used_g": 30.0, "type": "PLA", "color": "#FF0000"}]
  197. with (
  198. patch("backend.app.core.config.settings") as mock_settings,
  199. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"), # default: 15.0/kg
  200. patch(
  201. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  202. return_value=filament_usage,
  203. ),
  204. ):
  205. mock_settings.base_dir = MagicMock()
  206. mock_path = MagicMock()
  207. mock_path.exists.return_value = True
  208. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  209. results = await on_print_complete(
  210. printer_id=1,
  211. data={"status": "completed"},
  212. printer_manager=printer_manager,
  213. db=db,
  214. archive_id=10,
  215. )
  216. assert len(results) == 1
  217. assert results[0]["spool_id"] == 1
  218. assert results[0]["weight_used"] == 30.0
  219. # Cost = 30g / 1000 * 15.0 = 0.45
  220. assert results[0]["cost"] == 0.45
  221. @pytest.mark.asyncio
  222. async def test_cost_zero_when_default_cost_is_zero(self):
  223. """Cost is None when both spool cost and default cost are 0."""
  224. # Spool without cost_per_kg
  225. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  226. assignment = _make_assignment(spool_id=1)
  227. archive = _make_archive(archive_id=10)
  228. _active_sessions[1] = PrintSession(
  229. printer_id=1,
  230. print_name="Test",
  231. started_at=datetime.now(timezone.utc),
  232. tray_remain_start={(0, 0): 80},
  233. tray_now_at_start=0,
  234. )
  235. printer_manager = MagicMock()
  236. printer_manager.get_status.return_value = SimpleNamespace(
  237. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  238. progress=100,
  239. layer_num=50,
  240. tray_now=0,
  241. )
  242. # db returns: archive, queue_item(None), assignment, spool
  243. db = _mock_db_sequential([archive, None, assignment, spool])
  244. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  245. with (
  246. patch("backend.app.core.config.settings") as mock_settings,
  247. patch("backend.app.api.routes.settings.get_setting", return_value="0.0"), # no default cost
  248. patch(
  249. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  250. return_value=filament_usage,
  251. ),
  252. ):
  253. mock_settings.base_dir = MagicMock()
  254. mock_path = MagicMock()
  255. mock_path.exists.return_value = True
  256. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  257. results = await on_print_complete(
  258. printer_id=1,
  259. data={"status": "completed"},
  260. printer_manager=printer_manager,
  261. db=db,
  262. archive_id=10,
  263. )
  264. assert len(results) == 1
  265. assert results[0]["cost"] is None
  266. @pytest.mark.asyncio
  267. async def test_cost_for_failed_print_uses_actual_usage(self):
  268. """Failed print at 50% progress calculates cost from actual usage."""
  269. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=20.0)
  270. assignment = _make_assignment(spool_id=1)
  271. archive = _make_archive(archive_id=10)
  272. _active_sessions[1] = PrintSession(
  273. printer_id=1,
  274. print_name="Test",
  275. started_at=datetime.now(timezone.utc),
  276. tray_remain_start={(0, 0): 80},
  277. tray_now_at_start=0,
  278. )
  279. # Failed at 50% progress
  280. printer_manager = MagicMock()
  281. printer_manager.get_status.return_value = SimpleNamespace(
  282. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  283. progress=50,
  284. layer_num=25,
  285. tray_now=0,
  286. )
  287. # db returns: archive, queue_item(None), assignment, spool
  288. db = _mock_db_sequential([archive, None, assignment, spool])
  289. # 40g total, but only 50% used
  290. filament_usage = [{"slot_id": 1, "used_g": 40.0, "type": "PLA", "color": "#FF0000"}]
  291. with (
  292. patch("backend.app.core.config.settings") as mock_settings,
  293. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  294. patch(
  295. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  296. return_value=filament_usage,
  297. ),
  298. patch(
  299. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  300. return_value=None, # No layer data, use linear scaling
  301. ),
  302. ):
  303. mock_settings.base_dir = MagicMock()
  304. mock_path = MagicMock()
  305. mock_path.exists.return_value = True
  306. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  307. results = await on_print_complete(
  308. printer_id=1,
  309. data={"status": "failed", "last_progress": 50.0},
  310. printer_manager=printer_manager,
  311. db=db,
  312. archive_id=10,
  313. )
  314. assert len(results) == 1
  315. # 50% of 40g = 20g
  316. assert results[0]["weight_used"] == 20.0
  317. # Cost = 20g / 1000 * 20.0 = 0.40
  318. assert results[0]["cost"] == 0.40
  319. @pytest.mark.asyncio
  320. async def test_cost_with_ams_fallback_tracking(self):
  321. """AMS fallback tracking also calculates cost correctly."""
  322. spool = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=30.0)
  323. assignment = _make_assignment(spool_id=2)
  324. _active_sessions[1] = PrintSession(
  325. printer_id=1,
  326. print_name="Test",
  327. started_at=datetime.now(timezone.utc),
  328. tray_remain_start={(0, 0): 80},
  329. )
  330. printer_manager = MagicMock()
  331. printer_manager.get_status.return_value = SimpleNamespace(
  332. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  333. tray_now=0,
  334. last_loaded_tray=-1,
  335. )
  336. # db returns assignment then spool (no archive, AMS fallback path)
  337. db = _mock_db_sequential([assignment, spool])
  338. with patch("backend.app.api.routes.settings.get_setting", return_value="15.0"):
  339. results = await on_print_complete(
  340. printer_id=1,
  341. data={"status": "completed"},
  342. printer_manager=printer_manager,
  343. db=db,
  344. archive_id=None, # No archive = AMS fallback
  345. )
  346. assert len(results) == 1
  347. assert results[0]["spool_id"] == 2
  348. # 10% of 1000g = 100g
  349. assert results[0]["weight_used"] == 100.0
  350. # Cost = 100g / 1000 * 30.0 = 3.00
  351. assert results[0]["cost"] == 3.0
  352. @pytest.mark.asyncio
  353. async def test_multi_filament_cost_aggregation(self):
  354. """Multiple spools in one print have their costs tracked separately."""
  355. spool1 = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=20.0)
  356. spool2 = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=25.0)
  357. assignment1 = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  358. assignment2 = _make_assignment(spool_id=2, ams_id=0, tray_id=1)
  359. archive = _make_archive(archive_id=10)
  360. _active_sessions[1] = PrintSession(
  361. printer_id=1,
  362. print_name="Test",
  363. started_at=datetime.now(timezone.utc),
  364. tray_remain_start={(0, 0): 80, (0, 1): 90},
  365. tray_now_at_start=0,
  366. )
  367. printer_manager = MagicMock()
  368. printer_manager.get_status.return_value = SimpleNamespace(
  369. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}, {"id": 1, "remain": 80}]}]},
  370. progress=100,
  371. layer_num=50,
  372. tray_now=0,
  373. )
  374. # Mock slot-to-tray mapping: slot 1 -> tray 0, slot 2 -> tray 1
  375. ams_mapping = [0, 1]
  376. # db returns: archive, assignment1, spool1, assignment2, spool2
  377. # ams_mapping is provided, so no queue item lookup is performed
  378. db = _mock_db_sequential([archive, assignment1, spool1, assignment2, spool2])
  379. # Two filaments used
  380. filament_usage = [
  381. {"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"},
  382. {"slot_id": 2, "used_g": 25.0, "type": "PLA", "color": "#00FF00"},
  383. ]
  384. with (
  385. patch("backend.app.core.config.settings") as mock_settings,
  386. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  387. patch(
  388. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  389. return_value=filament_usage,
  390. ),
  391. ):
  392. mock_settings.base_dir = MagicMock()
  393. mock_path = MagicMock()
  394. mock_path.exists.return_value = True
  395. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  396. results = await on_print_complete(
  397. printer_id=1,
  398. data={"status": "completed"},
  399. printer_manager=printer_manager,
  400. db=db,
  401. archive_id=10,
  402. ams_mapping=ams_mapping,
  403. )
  404. assert len(results) == 2
  405. # First spool: 15g at 20/kg = 0.30
  406. spool1_result = next(r for r in results if r["spool_id"] == 1)
  407. assert spool1_result["weight_used"] == 15.0
  408. assert spool1_result["cost"] == 0.30
  409. # Second spool: 25g at 25/kg = 0.625, rounded to 0.62
  410. spool2_result = next(r for r in results if r["spool_id"] == 2)
  411. assert spool2_result["weight_used"] == 25.0
  412. assert spool2_result["cost"] == 0.62
  413. class TestCostAggregation:
  414. """Tests for cost aggregation to PrintArchive."""
  415. @pytest.mark.asyncio
  416. async def test_costs_summed_in_results(self):
  417. """Multiple spool costs are correctly summed from result dicts."""
  418. results = [
  419. {"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
  420. {"spool_id": 2, "weight_used": 30.0, "cost": 0.75},
  421. ]
  422. total_cost = sum(r.get("cost", 0) or 0 for r in results)
  423. assert total_cost == 1.25
  424. @pytest.mark.asyncio
  425. async def test_null_costs_handled_in_aggregation(self):
  426. """None costs don't break aggregation."""
  427. results = [
  428. {"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
  429. {"spool_id": 2, "weight_used": 30.0, "cost": None}, # No cost
  430. {"spool_id": 3, "weight_used": 10.0, "cost": 0.25},
  431. ]
  432. total_cost = sum(r.get("cost", 0) or 0 for r in results)
  433. assert total_cost == 0.75 # Only spools 1 and 3
  434. @pytest.mark.asyncio
  435. async def test_archive_cost_not_overwritten_with_zero(self):
  436. """archive.cost is preserved when no spool usage has cost data."""
  437. # Spool without cost_per_kg, default_filament_cost also 0 → cost=None per usage
  438. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  439. assignment = _make_assignment(spool_id=1)
  440. archive = _make_archive(archive_id=10)
  441. archive.cost = 5.00 # Pre-existing cost from catalog
  442. archive.print_name = "TestPrint"
  443. archive.printer_id = 1
  444. _active_sessions[1] = PrintSession(
  445. printer_id=1,
  446. print_name="TestPrint",
  447. started_at=datetime.now(timezone.utc),
  448. tray_remain_start={(0, 0): 80},
  449. tray_now_at_start=0,
  450. )
  451. printer_manager = MagicMock()
  452. printer_manager.get_status.return_value = SimpleNamespace(
  453. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  454. progress=100,
  455. layer_num=50,
  456. tray_now=0,
  457. )
  458. # Build mock db that returns proper scalars for the aggregation queries
  459. responses = []
  460. # 1. select(PrintArchive) → archive
  461. responses.append(("scalar_one_or_none", archive))
  462. # 2. select(PrintQueueItem) → None
  463. responses.append(("scalar_one_or_none", None))
  464. # 3. select(SpoolAssignment) → assignment
  465. responses.append(("scalar_one_or_none", assignment))
  466. # 4. select(Spool) → spool
  467. responses.append(("scalar_one_or_none", spool))
  468. # 5. cost aggregation: coalesce(sum(cost)) → 0 (no costs)
  469. responses.append(("scalar", 0))
  470. # 6. select(PrintArchive) → archive (for the guard check)
  471. responses.append(("scalar_one_or_none", archive))
  472. # 7. legacy fallback: coalesce(sum(cost)) → 0
  473. responses.append(("scalar", 0))
  474. db = AsyncMock()
  475. call_count = [0]
  476. async def mock_execute(*args, **kwargs):
  477. idx = call_count[0]
  478. call_count[0] += 1
  479. result = MagicMock()
  480. if idx < len(responses):
  481. method, value = responses[idx]
  482. if method == "scalar":
  483. result.scalar.return_value = value
  484. result.scalar_one_or_none.return_value = value
  485. else:
  486. result.scalar_one_or_none.return_value = value
  487. result.scalar.return_value = value
  488. else:
  489. result.scalar_one_or_none.return_value = None
  490. result.scalar.return_value = None
  491. return result
  492. db.execute = mock_execute
  493. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  494. with (
  495. patch("backend.app.core.config.settings") as mock_settings,
  496. patch("backend.app.api.routes.settings.get_setting", return_value="0.0"), # no default cost
  497. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
  498. ):
  499. mock_settings.base_dir = MagicMock()
  500. mock_path = MagicMock()
  501. mock_path.exists.return_value = True
  502. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  503. results = await on_print_complete(
  504. printer_id=1,
  505. data={"status": "completed"},
  506. printer_manager=printer_manager,
  507. db=db,
  508. archive_id=10,
  509. )
  510. # Usage tracked but cost is None (no cost_per_kg, no default)
  511. assert len(results) == 1
  512. assert results[0]["cost"] is None
  513. # Archive cost should NOT have been overwritten — still 5.00
  514. assert archive.cost == 5.00
  515. @pytest.mark.asyncio
  516. async def test_archive_cost_set_when_spool_has_cost(self):
  517. """archive.cost is set from spool usage when cost data exists."""
  518. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  519. assignment = _make_assignment(spool_id=1)
  520. archive = _make_archive(archive_id=10)
  521. archive.cost = None # No pre-existing cost
  522. archive.print_name = "TestPrint"
  523. archive.printer_id = 1
  524. _active_sessions[1] = PrintSession(
  525. printer_id=1,
  526. print_name="TestPrint",
  527. started_at=datetime.now(timezone.utc),
  528. tray_remain_start={(0, 0): 80},
  529. tray_now_at_start=0,
  530. )
  531. printer_manager = MagicMock()
  532. printer_manager.get_status.return_value = SimpleNamespace(
  533. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  534. progress=100,
  535. layer_num=50,
  536. tray_now=0,
  537. )
  538. # 20g at 25/kg = 0.50
  539. expected_cost = 0.50
  540. responses = []
  541. responses.append(("scalar_one_or_none", archive))
  542. responses.append(("scalar_one_or_none", None)) # queue item
  543. responses.append(("scalar_one_or_none", assignment))
  544. responses.append(("scalar_one_or_none", spool))
  545. # cost aggregation: sum returns 0.50
  546. responses.append(("scalar", expected_cost))
  547. # select archive for guard
  548. responses.append(("scalar_one_or_none", archive))
  549. db = AsyncMock()
  550. call_count = [0]
  551. async def mock_execute(*args, **kwargs):
  552. idx = call_count[0]
  553. call_count[0] += 1
  554. result = MagicMock()
  555. if idx < len(responses):
  556. method, value = responses[idx]
  557. result.scalar.return_value = value
  558. result.scalar_one_or_none.return_value = value
  559. else:
  560. result.scalar_one_or_none.return_value = None
  561. result.scalar.return_value = None
  562. return result
  563. db.execute = mock_execute
  564. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  565. with (
  566. patch("backend.app.core.config.settings") as mock_settings,
  567. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  568. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
  569. ):
  570. mock_settings.base_dir = MagicMock()
  571. mock_path = MagicMock()
  572. mock_path.exists.return_value = True
  573. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  574. results = await on_print_complete(
  575. printer_id=1,
  576. data={"status": "completed"},
  577. printer_manager=printer_manager,
  578. db=db,
  579. archive_id=10,
  580. )
  581. assert len(results) == 1
  582. assert results[0]["cost"] == expected_cost
  583. # Archive cost should have been updated
  584. assert archive.cost == expected_cost
  585. @pytest.mark.asyncio
  586. async def test_cost_with_archive_id(self):
  587. """Test cost aggregation using archive_id (3MF path)."""
  588. spool_new = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  589. assignment_new = _make_assignment(spool_id=1)
  590. archive_new = _make_archive(archive_id=20)
  591. filament_usage_new = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  592. printer_manager = MagicMock()
  593. printer_manager.get_status.return_value = SimpleNamespace(
  594. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  595. progress=100,
  596. layer_num=50,
  597. tray_now=0,
  598. )
  599. db = _mock_db_sequential([archive_new, None, assignment_new, spool_new])
  600. with (
  601. patch("backend.app.core.config.settings") as mock_settings,
  602. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  603. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage_new),
  604. ):
  605. mock_settings.base_dir = MagicMock()
  606. mock_path = MagicMock()
  607. mock_path.exists.return_value = True
  608. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  609. results_new = await on_print_complete(
  610. printer_id=1,
  611. data={"status": "completed"},
  612. printer_manager=printer_manager,
  613. db=db,
  614. archive_id=20,
  615. )
  616. assert len(results_new) == 1
  617. assert results_new[0]["spool_id"] == 1
  618. assert results_new[0]["cost"] == 0.50 # 20g / 1000 * 25.0
  619. @pytest.mark.asyncio
  620. async def test_cost_with_print_name_ams_fallback(self):
  621. """Test cost aggregation using print_name (AMS fallback, legacy path)."""
  622. spool_old = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=15.0)
  623. assignment_old = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
  624. legacy_print_name = "LegacyPrint"
  625. _active_sessions[1] = PrintSession(
  626. printer_id=1,
  627. print_name=legacy_print_name,
  628. started_at=datetime.now(timezone.utc),
  629. tray_remain_start={(0, 0): 80},
  630. tray_now_at_start=0,
  631. )
  632. printer_manager = MagicMock()
  633. printer_manager.get_status.return_value = SimpleNamespace(
  634. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  635. progress=100,
  636. layer_num=50,
  637. tray_now=0,
  638. )
  639. db = _mock_db_sequential([assignment_old, spool_old])
  640. with (
  641. patch("backend.app.core.config.settings") as mock_settings,
  642. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  643. patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=None),
  644. ):
  645. mock_settings.base_dir = MagicMock()
  646. mock_path = MagicMock()
  647. mock_path.exists.return_value = True
  648. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  649. results_old = await on_print_complete(
  650. printer_id=1,
  651. data={"status": "completed", "subtask_name": legacy_print_name, "filename": legacy_print_name},
  652. printer_manager=printer_manager,
  653. db=db,
  654. archive_id=None,
  655. )
  656. assert len(results_old) == 1
  657. assert results_old[0]["spool_id"] == 2
  658. assert results_old[0]["cost"] == 1.5 # 100g / 1000 * 15.0