test_cost_statistics.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. import pytest
  2. from httpx import AsyncClient
  3. from sqlalchemy import select
  4. from backend.app.models.archive import PrintArchive
  5. from backend.app.models.spool import Spool
  6. from backend.app.models.spool_assignment import SpoolAssignment
  7. from backend.app.models.spool_usage_history import SpoolUsageHistory
  8. @pytest.fixture(autouse=True)
  9. def cleanup_test_archive_files():
  10. yield
  11. import glob
  12. import os
  13. # Remove any test archive files created in archives/test/
  14. for f in glob.glob("archives/test/test_print*.3mf"):
  15. try:
  16. os.remove(f)
  17. except Exception:
  18. pass
  19. """Integration tests for cost tracking in archives and statistics.
  20. Tests the full flow of cost tracking from usage to statistics:
  21. - Archive cost field populated correctly
  22. - Statistics endpoint aggregates costs
  23. - Completed vs failed prints cost handling
  24. """
  25. class TestArchiveCostTracking:
  26. """Tests for cost field in PrintArchive."""
  27. @pytest.mark.asyncio
  28. @pytest.mark.integration
  29. async def test_archive_has_cost_field(
  30. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  31. ):
  32. # Verify PrintArchive includes cost field in response.
  33. printer = await printer_factory()
  34. archive = await archive_factory(
  35. printer.id,
  36. print_name="Test Archive",
  37. status="completed",
  38. cost=5.50, # Set a cost
  39. )
  40. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  41. assert response.status_code == 200
  42. result = response.json()
  43. assert "cost" in result
  44. assert result["cost"] == 5.50
  45. await db_session.rollback()
  46. @pytest.mark.asyncio
  47. @pytest.mark.integration
  48. async def test_archive_cost_null_when_not_set(
  49. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  50. ):
  51. # Verify cost is null when not set.
  52. printer = await printer_factory()
  53. archive = await archive_factory(
  54. printer.id,
  55. print_name="Test Archive",
  56. status="completed",
  57. # cost not set
  58. )
  59. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  60. assert response.status_code == 200
  61. result = response.json()
  62. assert result["cost"] is None or result["cost"] == 0
  63. await db_session.rollback()
  64. class TestStatisticsCostAggregation:
  65. """Tests for cost aggregation in statistics endpoint."""
  66. @pytest.mark.asyncio
  67. @pytest.mark.integration
  68. async def test_statistics_includes_total_cost(
  69. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  70. ):
  71. # Verify statistics endpoint includes total_cost field.
  72. printer = await printer_factory()
  73. # Create archives with costs
  74. await archive_factory(
  75. printer.id,
  76. status="completed",
  77. cost=2.50,
  78. filament_used_grams=100.0,
  79. )
  80. await archive_factory(
  81. printer.id,
  82. status="completed",
  83. cost=3.75,
  84. filament_used_grams=150.0,
  85. )
  86. response = await async_client.get("/api/v1/archives/stats")
  87. assert response.status_code == 200
  88. result = response.json()
  89. assert "total_cost" in result
  90. assert result["total_cost"] == 6.25
  91. await db_session.rollback()
  92. @pytest.mark.asyncio
  93. @pytest.mark.integration
  94. async def test_statistics_aggregates_costs_correctly(
  95. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  96. ):
  97. # Verify statistics correctly sums costs from all archives.
  98. printer = await printer_factory()
  99. # Create multiple archives with different costs
  100. costs = [1.25, 2.50, 0.75, 5.00, 0.50]
  101. for cost in costs:
  102. await archive_factory(
  103. printer.id,
  104. status="completed",
  105. cost=cost,
  106. filament_used_grams=50.0,
  107. )
  108. response = await async_client.get("/api/v1/archives/stats")
  109. assert response.status_code == 200
  110. result = response.json()
  111. expected_total = sum(costs)
  112. assert result["total_cost"] == expected_total
  113. await db_session.rollback()
  114. @pytest.mark.asyncio
  115. @pytest.mark.integration
  116. async def test_statistics_handles_null_costs(
  117. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  118. ):
  119. # Verify statistics handles archives with null costs gracefully.
  120. printer = await printer_factory()
  121. # Mix of archives with and without costs
  122. await archive_factory(printer.id, status="completed", cost=2.50)
  123. await archive_factory(printer.id, status="completed", cost=None)
  124. await archive_factory(printer.id, status="completed", cost=1.75)
  125. await archive_factory(printer.id, status="completed") # No cost field
  126. response = await async_client.get("/api/v1/archives/stats")
  127. assert response.status_code == 200
  128. result = response.json()
  129. # Should sum only non-null costs
  130. assert result["total_cost"] == 4.25
  131. await db_session.rollback()
  132. @pytest.mark.asyncio
  133. @pytest.mark.integration
  134. async def test_statistics_includes_failed_print_costs(
  135. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  136. ):
  137. # Verify failed prints with costs are included in statistics.
  138. printer = await printer_factory()
  139. await archive_factory(printer.id, status="completed", cost=5.00)
  140. await archive_factory(printer.id, status="failed", cost=2.50) # Failed but has cost
  141. await archive_factory(printer.id, status="cancelled", cost=1.00)
  142. response = await async_client.get("/api/v1/archives/stats")
  143. assert response.status_code == 200
  144. result = response.json()
  145. # All prints should contribute to total cost
  146. assert result["total_cost"] == 8.50
  147. await db_session.rollback()
  148. @pytest.mark.asyncio
  149. @pytest.mark.integration
  150. async def test_statistics_zero_cost_when_no_archives(self, async_client: AsyncClient):
  151. """Verify total_cost is 0 when no archives exist."""
  152. response = await async_client.get("/api/v1/archives/stats")
  153. assert response.status_code == 200
  154. result = response.json()
  155. assert result["total_cost"] == 0.0
  156. class TestSpoolCostPersistence:
  157. """Tests for spool cost_per_kg field."""
  158. @pytest.mark.asyncio
  159. @pytest.mark.integration
  160. async def test_spool_cost_fields_persist(self, async_client: AsyncClient, db_session):
  161. # Verify cost_per_kg is saved and retrieved.
  162. # Create a spool with cost
  163. spool_data = {
  164. "material": "PLA",
  165. "brand": "TestBrand",
  166. "label_weight": 1000,
  167. "core_weight": 250,
  168. "cost_per_kg": 25.50,
  169. }
  170. create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data)
  171. assert create_response.status_code == 200
  172. spool_id = create_response.json()["id"]
  173. # Retrieve and verify
  174. get_response = await async_client.get(f"/api/v1/inventory/spools/{spool_id}")
  175. assert get_response.status_code == 200
  176. result = get_response.json()
  177. assert result["cost_per_kg"] == 25.50
  178. await db_session.rollback()
  179. @pytest.mark.asyncio
  180. @pytest.mark.integration
  181. async def test_spool_update_cost_fields(self, async_client: AsyncClient, db_session):
  182. # Verify cost fields can be updated.
  183. # Create spool without cost
  184. spool_data = {
  185. "material": "PETG",
  186. "brand": "TestBrand",
  187. "label_weight": 1000,
  188. "core_weight": 250,
  189. }
  190. create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data)
  191. assert create_response.status_code == 200
  192. spool_id = create_response.json()["id"]
  193. # Update with cost
  194. update_data = {
  195. "cost_per_kg": 30.00,
  196. }
  197. update_response = await async_client.patch(f"/api/v1/inventory/spools/{spool_id}", json=update_data)
  198. assert update_response.status_code == 200
  199. result = update_response.json()
  200. assert result["cost_per_kg"] == 30.00
  201. await db_session.rollback()
  202. @pytest.mark.asyncio
  203. @pytest.mark.integration
  204. async def test_spool_cost_null_by_default(self, async_client: AsyncClient, db_session):
  205. # Verify cost_per_kg defaults to null when not provided.
  206. spool_data = {
  207. "material": "ABS",
  208. "label_weight": 1000,
  209. "core_weight": 250,
  210. }
  211. create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data)
  212. assert create_response.status_code == 200
  213. result = create_response.json()
  214. assert result["cost_per_kg"] is None
  215. await db_session.rollback()
  216. class TestCostCalculationScenarios:
  217. """End-to-end tests for various cost calculation scenarios."""
  218. @pytest.mark.asyncio
  219. @pytest.mark.integration
  220. async def test_cost_with_multiple_colors(self, async_client: AsyncClient, printer_factory, db_session):
  221. # Verify cost tracking works for multi-color prints.
  222. # Create two spools with different costs
  223. spool1_data = {
  224. "material": "ABS",
  225. "brand": "TestBrand",
  226. "label_weight": 1000,
  227. "core_weight": 250,
  228. "cost_per_kg": 20.00,
  229. }
  230. spool2_data = {
  231. "material": "PLA",
  232. "label_weight": 1000,
  233. "core_weight": 250,
  234. "cost_per_kg": 25.00,
  235. }
  236. spool1_response = await async_client.post("/api/v1/inventory/spools", json=spool1_data)
  237. spool2_response = await async_client.post("/api/v1/inventory/spools", json=spool2_data)
  238. assert spool1_response.status_code == 200
  239. assert spool2_response.status_code == 200
  240. # Verify spools created with correct costs
  241. assert spool1_response.json()["cost_per_kg"] == 20.00
  242. assert spool2_response.json()["cost_per_kg"] == 25.00
  243. await db_session.rollback()
  244. @pytest.mark.asyncio
  245. @pytest.mark.integration
  246. async def test_cost_precision(self, async_client: AsyncClient, db_session):
  247. # Verify cost calculations maintain proper precision.
  248. # Create spool with specific cost
  249. spool_data = {
  250. "material": "PLA",
  251. "brand": "TestBrand",
  252. "label_weight": 1000,
  253. "core_weight": 250,
  254. "cost_per_kg": 19.99, # Specific price
  255. }
  256. response = await async_client.post("/api/v1/inventory/spools", json=spool_data)
  257. assert response.status_code == 200
  258. result = response.json()
  259. # Verify precision is maintained
  260. assert result["cost_per_kg"] == 19.99
  261. await db_session.rollback()
  262. @pytest.mark.asyncio
  263. @pytest.mark.integration
  264. async def test_archive_cost_with_archive_id_and_print_name(
  265. self, async_client, archive_factory, printer_factory, db_session
  266. ):
  267. """Test archive cost recalculation using both archive_id and print_name fallback."""
  268. from backend.app.models.spool import Spool
  269. from backend.app.models.spool_usage_history import SpoolUsageHistory
  270. printer = await printer_factory()
  271. # Create spools and commit
  272. spool_new = Spool(
  273. material="PLA",
  274. brand="BrandA",
  275. label_weight=1000,
  276. core_weight=250,
  277. cost_per_kg=20.0,
  278. )
  279. spool_old = Spool(
  280. material="ABS",
  281. brand="BrandB",
  282. label_weight=1000,
  283. core_weight=250,
  284. cost_per_kg=15.0,
  285. )
  286. db_session.add_all([spool_new, spool_old])
  287. await db_session.commit()
  288. await db_session.refresh(spool_new)
  289. await db_session.refresh(spool_old)
  290. # Create archive with new SpoolUsageHistory (archive_id set)
  291. archive_new = await archive_factory(
  292. printer.id,
  293. print_name="UniquePrint",
  294. status="completed",
  295. cost=None,
  296. )
  297. history_new = SpoolUsageHistory(
  298. spool_id=spool_new.id,
  299. printer_id=printer.id,
  300. print_name="UniquePrint",
  301. weight_used=20.0,
  302. percent_used=20,
  303. status="completed",
  304. cost=0.50,
  305. archive_id=archive_new.id,
  306. )
  307. db_session.add(history_new)
  308. # Create archive with old SpoolUsageHistory (archive_id NULL — legacy record)
  309. archive_old = await archive_factory(
  310. printer.id,
  311. print_name="LegacyPrint",
  312. status="completed",
  313. cost=None,
  314. )
  315. archive_old.filament_used_grams = 30.0
  316. await db_session.commit()
  317. history_old = SpoolUsageHistory(
  318. spool_id=spool_old.id,
  319. printer_id=printer.id,
  320. print_name="LegacyPrint",
  321. weight_used=30.0,
  322. percent_used=30,
  323. status="completed",
  324. cost=0.45,
  325. archive_id=None,
  326. )
  327. db_session.add(history_old)
  328. await db_session.commit()
  329. # Recalculate costs for all archives
  330. recalc_response = await async_client.post("/api/v1/archives/recalculate-costs")
  331. assert recalc_response.status_code == 200
  332. assert recalc_response.json()["updated"] >= 1
  333. # Verify archive_new cost from archive_id-linked SpoolUsageHistory
  334. response_new = await async_client.get(f"/api/v1/archives/{archive_new.id}")
  335. assert response_new.status_code == 200
  336. assert response_new.json()["cost"] == 0.50
  337. # Verify archive_old cost from legacy print_name fallback
  338. response_old = await async_client.get(f"/api/v1/archives/{archive_old.id}")
  339. assert response_old.status_code == 200
  340. assert response_old.json()["cost"] == 0.45
  341. await db_session.rollback()