test_cost_tracking.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. """Unit tests for cost tracking in usage_tracker.py.
  2. Tests cost calculation scenarios:
  3. - Spool-specific cost_per_kg
  4. - Default fallback cost from settings
  5. - Spools without cost (None)
  6. - Completed prints
  7. - Failed/partial prints
  8. - Cost aggregation to archives
  9. """
  10. from datetime import datetime, timezone
  11. from types import SimpleNamespace
  12. from unittest.mock import AsyncMock, MagicMock, patch
  13. import pytest
  14. from backend.app.services.usage_tracker import (
  15. PrintSession,
  16. _active_sessions,
  17. _track_from_3mf,
  18. on_print_complete,
  19. )
  20. def _make_spool(spool_id=1, label_weight=1000, weight_used=0, cost_per_kg=None):
  21. """Create a mock Spool object with cost fields."""
  22. spool = MagicMock()
  23. spool.id = spool_id
  24. spool.label_weight = label_weight
  25. spool.weight_used = weight_used
  26. spool.cost_per_kg = cost_per_kg
  27. spool.last_used = None
  28. spool.material = "PLA"
  29. return spool
  30. def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
  31. """Create a mock SpoolAssignment object."""
  32. assignment = MagicMock()
  33. assignment.spool_id = spool_id
  34. assignment.printer_id = printer_id
  35. assignment.ams_id = ams_id
  36. assignment.tray_id = tray_id
  37. return assignment
  38. def _make_archive(archive_id=1, file_path="archives/1/test.3mf"):
  39. """Create a mock PrintArchive object."""
  40. archive = MagicMock()
  41. archive.id = archive_id
  42. archive.file_path = file_path
  43. return archive
  44. def _mock_db_sequential(responses):
  45. """Create mock db that returns responses in order."""
  46. db = AsyncMock()
  47. call_count = [0]
  48. async def mock_execute(*args, **kwargs):
  49. idx = call_count[0]
  50. call_count[0] += 1
  51. result = MagicMock()
  52. if idx < len(responses):
  53. result.scalar_one_or_none.return_value = responses[idx]
  54. else:
  55. result.scalar_one_or_none.return_value = None
  56. return result
  57. db.execute = mock_execute
  58. return db
  59. class TestCostCalculation:
  60. """Tests for cost calculation in usage tracking."""
  61. @pytest.fixture(autouse=True)
  62. def _clear_sessions(self):
  63. _active_sessions.clear()
  64. yield
  65. _active_sessions.clear()
  66. @pytest.mark.asyncio
  67. async def test_cost_with_spool_specific_cost_per_kg(self):
  68. """Cost is calculated using spool-specific cost_per_kg when available."""
  69. # Spool with cost_per_kg = 25.00 USD/kg
  70. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
  71. assignment = _make_assignment(spool_id=1)
  72. archive = _make_archive(archive_id=10)
  73. _active_sessions[1] = PrintSession(
  74. printer_id=1,
  75. print_name="Test",
  76. started_at=datetime.now(timezone.utc),
  77. tray_remain_start={(0, 0): 80},
  78. tray_now_at_start=0,
  79. )
  80. printer_manager = MagicMock()
  81. printer_manager.get_status.return_value = SimpleNamespace(
  82. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  83. progress=100,
  84. layer_num=50,
  85. tray_now=0,
  86. )
  87. # db returns: archive, queue_item(None), assignment, spool
  88. db = _mock_db_sequential([archive, None, assignment, spool])
  89. # 20g used from 3MF
  90. filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
  91. with (
  92. patch("backend.app.core.config.settings") as mock_settings,
  93. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"), # default cost
  94. patch(
  95. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  96. return_value=filament_usage,
  97. ),
  98. ):
  99. mock_settings.base_dir = MagicMock()
  100. mock_path = MagicMock()
  101. mock_path.exists.return_value = True
  102. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  103. results = await on_print_complete(
  104. printer_id=1,
  105. data={"status": "completed"},
  106. printer_manager=printer_manager,
  107. db=db,
  108. archive_id=10,
  109. )
  110. assert len(results) == 1
  111. assert results[0]["spool_id"] == 1
  112. assert results[0]["weight_used"] == 20.0
  113. # Cost = 20g / 1000 * 25.0 = 0.50
  114. assert results[0]["cost"] == 0.50
  115. @pytest.mark.asyncio
  116. async def test_cost_with_default_fallback(self):
  117. """Cost uses default_filament_cost from settings when spool cost is None."""
  118. # Spool without cost_per_kg
  119. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  120. assignment = _make_assignment(spool_id=1)
  121. archive = _make_archive(archive_id=10)
  122. _active_sessions[1] = PrintSession(
  123. printer_id=1,
  124. print_name="Test",
  125. started_at=datetime.now(timezone.utc),
  126. tray_remain_start={(0, 0): 80},
  127. tray_now_at_start=0,
  128. )
  129. printer_manager = MagicMock()
  130. printer_manager.get_status.return_value = SimpleNamespace(
  131. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  132. progress=100,
  133. layer_num=50,
  134. tray_now=0,
  135. )
  136. # db returns: archive, queue_item(None), assignment, spool
  137. db = _mock_db_sequential([archive, None, assignment, spool])
  138. # 30g used from 3MF
  139. filament_usage = [{"slot_id": 1, "used_g": 30.0, "type": "PLA", "color": "#FF0000"}]
  140. with (
  141. patch("backend.app.core.config.settings") as mock_settings,
  142. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"), # default: 15.0/kg
  143. patch(
  144. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  145. return_value=filament_usage,
  146. ),
  147. ):
  148. mock_settings.base_dir = MagicMock()
  149. mock_path = MagicMock()
  150. mock_path.exists.return_value = True
  151. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  152. results = await on_print_complete(
  153. printer_id=1,
  154. data={"status": "completed"},
  155. printer_manager=printer_manager,
  156. db=db,
  157. archive_id=10,
  158. )
  159. assert len(results) == 1
  160. assert results[0]["spool_id"] == 1
  161. assert results[0]["weight_used"] == 30.0
  162. # Cost = 30g / 1000 * 15.0 = 0.45
  163. assert results[0]["cost"] == 0.45
  164. @pytest.mark.asyncio
  165. async def test_cost_zero_when_default_cost_is_zero(self):
  166. """Cost is None when both spool cost and default cost are 0."""
  167. # Spool without cost_per_kg
  168. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
  169. assignment = _make_assignment(spool_id=1)
  170. archive = _make_archive(archive_id=10)
  171. _active_sessions[1] = PrintSession(
  172. printer_id=1,
  173. print_name="Test",
  174. started_at=datetime.now(timezone.utc),
  175. tray_remain_start={(0, 0): 80},
  176. tray_now_at_start=0,
  177. )
  178. printer_manager = MagicMock()
  179. printer_manager.get_status.return_value = SimpleNamespace(
  180. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  181. progress=100,
  182. layer_num=50,
  183. tray_now=0,
  184. )
  185. # db returns: archive, queue_item(None), assignment, spool
  186. db = _mock_db_sequential([archive, None, assignment, spool])
  187. filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
  188. with (
  189. patch("backend.app.core.config.settings") as mock_settings,
  190. patch("backend.app.api.routes.settings.get_setting", return_value="0.0"), # no default cost
  191. patch(
  192. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  193. return_value=filament_usage,
  194. ),
  195. ):
  196. mock_settings.base_dir = MagicMock()
  197. mock_path = MagicMock()
  198. mock_path.exists.return_value = True
  199. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  200. results = await on_print_complete(
  201. printer_id=1,
  202. data={"status": "completed"},
  203. printer_manager=printer_manager,
  204. db=db,
  205. archive_id=10,
  206. )
  207. assert len(results) == 1
  208. assert results[0]["cost"] is None
  209. @pytest.mark.asyncio
  210. async def test_cost_for_failed_print_uses_actual_usage(self):
  211. """Failed print at 50% progress calculates cost from actual usage."""
  212. spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=20.0)
  213. assignment = _make_assignment(spool_id=1)
  214. archive = _make_archive(archive_id=10)
  215. _active_sessions[1] = PrintSession(
  216. printer_id=1,
  217. print_name="Test",
  218. started_at=datetime.now(timezone.utc),
  219. tray_remain_start={(0, 0): 80},
  220. tray_now_at_start=0,
  221. )
  222. # Failed at 50% progress
  223. printer_manager = MagicMock()
  224. printer_manager.get_status.return_value = SimpleNamespace(
  225. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  226. progress=50,
  227. layer_num=25,
  228. tray_now=0,
  229. )
  230. # db returns: archive, queue_item(None), assignment, spool
  231. db = _mock_db_sequential([archive, None, assignment, spool])
  232. # 40g total, but only 50% used
  233. filament_usage = [{"slot_id": 1, "used_g": 40.0, "type": "PLA", "color": "#FF0000"}]
  234. with (
  235. patch("backend.app.core.config.settings") as mock_settings,
  236. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  237. patch(
  238. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  239. return_value=filament_usage,
  240. ),
  241. patch(
  242. "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
  243. return_value=None, # No layer data, use linear scaling
  244. ),
  245. ):
  246. mock_settings.base_dir = MagicMock()
  247. mock_path = MagicMock()
  248. mock_path.exists.return_value = True
  249. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  250. results = await on_print_complete(
  251. printer_id=1,
  252. data={"status": "failed", "last_progress": 50.0},
  253. printer_manager=printer_manager,
  254. db=db,
  255. archive_id=10,
  256. )
  257. assert len(results) == 1
  258. # 50% of 40g = 20g
  259. assert results[0]["weight_used"] == 20.0
  260. # Cost = 20g / 1000 * 20.0 = 0.40
  261. assert results[0]["cost"] == 0.40
  262. @pytest.mark.asyncio
  263. async def test_cost_with_ams_fallback_tracking(self):
  264. """AMS fallback tracking also calculates cost correctly."""
  265. spool = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=30.0)
  266. assignment = _make_assignment(spool_id=2)
  267. _active_sessions[1] = PrintSession(
  268. printer_id=1,
  269. print_name="Test",
  270. started_at=datetime.now(timezone.utc),
  271. tray_remain_start={(0, 0): 80},
  272. )
  273. printer_manager = MagicMock()
  274. printer_manager.get_status.return_value = SimpleNamespace(
  275. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
  276. tray_now=0,
  277. last_loaded_tray=-1,
  278. )
  279. # db returns assignment then spool (no archive, AMS fallback path)
  280. db = _mock_db_sequential([assignment, spool])
  281. with patch("backend.app.api.routes.settings.get_setting", return_value="15.0"):
  282. results = await on_print_complete(
  283. printer_id=1,
  284. data={"status": "completed"},
  285. printer_manager=printer_manager,
  286. db=db,
  287. archive_id=None, # No archive = AMS fallback
  288. )
  289. assert len(results) == 1
  290. assert results[0]["spool_id"] == 2
  291. # 10% of 1000g = 100g
  292. assert results[0]["weight_used"] == 100.0
  293. # Cost = 100g / 1000 * 30.0 = 3.00
  294. assert results[0]["cost"] == 3.0
  295. @pytest.mark.asyncio
  296. async def test_multi_filament_cost_aggregation(self):
  297. """Multiple spools in one print have their costs tracked separately."""
  298. spool1 = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=20.0)
  299. spool2 = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=25.0)
  300. assignment1 = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
  301. assignment2 = _make_assignment(spool_id=2, ams_id=0, tray_id=1)
  302. archive = _make_archive(archive_id=10)
  303. _active_sessions[1] = PrintSession(
  304. printer_id=1,
  305. print_name="Test",
  306. started_at=datetime.now(timezone.utc),
  307. tray_remain_start={(0, 0): 80, (0, 1): 90},
  308. tray_now_at_start=0,
  309. )
  310. printer_manager = MagicMock()
  311. printer_manager.get_status.return_value = SimpleNamespace(
  312. raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}, {"id": 1, "remain": 80}]}]},
  313. progress=100,
  314. layer_num=50,
  315. tray_now=0,
  316. )
  317. # Mock slot-to-tray mapping: slot 1 -> tray 0, slot 2 -> tray 1
  318. ams_mapping = [0, 1]
  319. # db returns: archive, assignment1, spool1, assignment2, spool2
  320. # ams_mapping is provided, so no queue item lookup is performed
  321. db = _mock_db_sequential([archive, assignment1, spool1, assignment2, spool2])
  322. # Two filaments used
  323. filament_usage = [
  324. {"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"},
  325. {"slot_id": 2, "used_g": 25.0, "type": "PLA", "color": "#00FF00"},
  326. ]
  327. with (
  328. patch("backend.app.core.config.settings") as mock_settings,
  329. patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
  330. patch(
  331. "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
  332. return_value=filament_usage,
  333. ),
  334. ):
  335. mock_settings.base_dir = MagicMock()
  336. mock_path = MagicMock()
  337. mock_path.exists.return_value = True
  338. mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
  339. results = await on_print_complete(
  340. printer_id=1,
  341. data={"status": "completed"},
  342. printer_manager=printer_manager,
  343. db=db,
  344. archive_id=10,
  345. ams_mapping=ams_mapping,
  346. )
  347. assert len(results) == 2
  348. # First spool: 15g at 20/kg = 0.30
  349. spool1_result = next(r for r in results if r["spool_id"] == 1)
  350. assert spool1_result["weight_used"] == 15.0
  351. assert spool1_result["cost"] == 0.30
  352. # Second spool: 25g at 25/kg = 0.625, rounded to 0.62
  353. spool2_result = next(r for r in results if r["spool_id"] == 2)
  354. assert spool2_result["weight_used"] == 25.0
  355. assert spool2_result["cost"] == 0.62
  356. class TestCostAggregation:
  357. """Tests for cost aggregation to PrintArchive."""
  358. @pytest.mark.asyncio
  359. async def test_costs_summed_in_archive(self):
  360. """Multiple spool costs are summed when aggregated to archive."""
  361. # This test would need to mock the full main.py flow
  362. # For now, we verify the results dict structure includes cost
  363. results = [
  364. {"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
  365. {"spool_id": 2, "weight_used": 30.0, "cost": 0.75},
  366. ]
  367. # Simulate aggregation logic from main.py
  368. total_cost = sum(r.get("cost", 0) or 0 for r in results)
  369. assert total_cost == 1.25
  370. @pytest.mark.asyncio
  371. async def test_null_costs_handled_in_aggregation(self):
  372. """None costs don't break aggregation."""
  373. results = [
  374. {"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
  375. {"spool_id": 2, "weight_used": 30.0, "cost": None}, # No cost
  376. {"spool_id": 3, "weight_used": 10.0, "cost": 0.25},
  377. ]
  378. # Aggregation should handle None gracefully
  379. total_cost = sum(r.get("cost", 0) or 0 for r in results)
  380. assert total_cost == 0.75 # Only spools 1 and 3