test_cost_statistics.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. """Integration tests for cost tracking in archives and statistics.
  2. Tests the full flow of cost tracking from usage to statistics:
  3. - Archive cost field populated correctly
  4. - Statistics endpoint aggregates costs
  5. - Completed vs failed prints cost handling
  6. """
  7. import pytest
  8. from httpx import AsyncClient
  9. from sqlalchemy import select
  10. from backend.app.models.archive import PrintArchive
  11. from backend.app.models.spool import Spool
  12. from backend.app.models.spool_assignment import SpoolAssignment
  13. class TestArchiveCostTracking:
  14. """Tests for cost field in PrintArchive."""
  15. @pytest.mark.asyncio
  16. @pytest.mark.integration
  17. async def test_archive_has_cost_field(
  18. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  19. ):
  20. # Verify PrintArchive includes cost field in response.
  21. printer = await printer_factory()
  22. archive = await archive_factory(
  23. printer.id,
  24. print_name="Test Archive",
  25. status="completed",
  26. cost=5.50, # Set a cost
  27. )
  28. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  29. assert response.status_code == 200
  30. result = response.json()
  31. assert "cost" in result
  32. assert result["cost"] == 5.50
  33. await db_session.rollback()
  34. @pytest.mark.asyncio
  35. @pytest.mark.integration
  36. async def test_archive_cost_null_when_not_set(
  37. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  38. ):
  39. # Verify cost is null when not set.
  40. printer = await printer_factory()
  41. archive = await archive_factory(
  42. printer.id,
  43. print_name="Test Archive",
  44. status="completed",
  45. # cost not set
  46. )
  47. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  48. assert response.status_code == 200
  49. result = response.json()
  50. assert result["cost"] is None or result["cost"] == 0
  51. await db_session.rollback()
  52. class TestStatisticsCostAggregation:
  53. """Tests for cost aggregation in statistics endpoint."""
  54. @pytest.mark.asyncio
  55. @pytest.mark.integration
  56. async def test_statistics_includes_total_cost(
  57. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  58. ):
  59. # Verify statistics endpoint includes total_cost field.
  60. printer = await printer_factory()
  61. # Create archives with costs
  62. await archive_factory(
  63. printer.id,
  64. status="completed",
  65. cost=2.50,
  66. filament_used_grams=100.0,
  67. )
  68. await archive_factory(
  69. printer.id,
  70. status="completed",
  71. cost=3.75,
  72. filament_used_grams=150.0,
  73. )
  74. response = await async_client.get("/api/v1/archives/stats")
  75. assert response.status_code == 200
  76. result = response.json()
  77. assert "total_cost" in result
  78. assert result["total_cost"] == 6.25
  79. await db_session.rollback()
  80. @pytest.mark.asyncio
  81. @pytest.mark.integration
  82. async def test_statistics_aggregates_costs_correctly(
  83. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  84. ):
  85. # Verify statistics correctly sums costs from all archives.
  86. printer = await printer_factory()
  87. # Create multiple archives with different costs
  88. costs = [1.25, 2.50, 0.75, 5.00, 0.50]
  89. for cost in costs:
  90. await archive_factory(
  91. printer.id,
  92. status="completed",
  93. cost=cost,
  94. filament_used_grams=50.0,
  95. )
  96. response = await async_client.get("/api/v1/archives/stats")
  97. assert response.status_code == 200
  98. result = response.json()
  99. expected_total = sum(costs)
  100. assert result["total_cost"] == expected_total
  101. await db_session.rollback()
  102. @pytest.mark.asyncio
  103. @pytest.mark.integration
  104. async def test_statistics_handles_null_costs(
  105. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  106. ):
  107. # Verify statistics handles archives with null costs gracefully.
  108. printer = await printer_factory()
  109. # Mix of archives with and without costs
  110. await archive_factory(printer.id, status="completed", cost=2.50)
  111. await archive_factory(printer.id, status="completed", cost=None)
  112. await archive_factory(printer.id, status="completed", cost=1.75)
  113. await archive_factory(printer.id, status="completed") # No cost field
  114. response = await async_client.get("/api/v1/archives/stats")
  115. assert response.status_code == 200
  116. result = response.json()
  117. # Should sum only non-null costs
  118. assert result["total_cost"] == 4.25
  119. await db_session.rollback()
  120. @pytest.mark.asyncio
  121. @pytest.mark.integration
  122. async def test_statistics_includes_failed_print_costs(
  123. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  124. ):
  125. # Verify failed prints with costs are included in statistics.
  126. printer = await printer_factory()
  127. await archive_factory(printer.id, status="completed", cost=5.00)
  128. await archive_factory(printer.id, status="failed", cost=2.50) # Failed but has cost
  129. await archive_factory(printer.id, status="cancelled", cost=1.00)
  130. response = await async_client.get("/api/v1/archives/stats")
  131. assert response.status_code == 200
  132. result = response.json()
  133. # All prints should contribute to total cost
  134. assert result["total_cost"] == 8.50
  135. await db_session.rollback()
  136. @pytest.mark.asyncio
  137. @pytest.mark.integration
  138. async def test_statistics_zero_cost_when_no_archives(self, async_client: AsyncClient):
  139. """Verify total_cost is 0 when no archives exist."""
  140. response = await async_client.get("/api/v1/archives/stats")
  141. assert response.status_code == 200
  142. result = response.json()
  143. assert result["total_cost"] == 0.0
  144. class TestSpoolCostPersistence:
  145. """Tests for spool cost_per_kg field."""
  146. @pytest.mark.asyncio
  147. @pytest.mark.integration
  148. async def test_spool_cost_fields_persist(self, async_client: AsyncClient, db_session):
  149. # Verify cost_per_kg is saved and retrieved.
  150. # Create a spool with cost
  151. spool_data = {
  152. "material": "PLA",
  153. "brand": "TestBrand",
  154. "label_weight": 1000,
  155. "core_weight": 250,
  156. "cost_per_kg": 25.50,
  157. }
  158. create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data)
  159. assert create_response.status_code == 200
  160. spool_id = create_response.json()["id"]
  161. # Retrieve and verify
  162. get_response = await async_client.get(f"/api/v1/inventory/spools/{spool_id}")
  163. assert get_response.status_code == 200
  164. result = get_response.json()
  165. assert result["cost_per_kg"] == 25.50
  166. await db_session.rollback()
  167. @pytest.mark.asyncio
  168. @pytest.mark.integration
  169. async def test_spool_update_cost_fields(self, async_client: AsyncClient, db_session):
  170. # Verify cost fields can be updated.
  171. # Create spool without cost
  172. spool_data = {
  173. "material": "PETG",
  174. "brand": "TestBrand",
  175. "label_weight": 1000,
  176. "core_weight": 250,
  177. }
  178. create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data)
  179. assert create_response.status_code == 200
  180. spool_id = create_response.json()["id"]
  181. # Update with cost
  182. update_data = {
  183. "cost_per_kg": 30.00,
  184. }
  185. update_response = await async_client.patch(f"/api/v1/inventory/spools/{spool_id}", json=update_data)
  186. assert update_response.status_code == 200
  187. result = update_response.json()
  188. assert result["cost_per_kg"] == 30.00
  189. await db_session.rollback()
  190. @pytest.mark.asyncio
  191. @pytest.mark.integration
  192. async def test_spool_cost_null_by_default(self, async_client: AsyncClient, db_session):
  193. # Verify cost_per_kg defaults to null when not provided.
  194. spool_data = {
  195. "material": "ABS",
  196. "label_weight": 1000,
  197. "core_weight": 250,
  198. }
  199. create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data)
  200. assert create_response.status_code == 200
  201. result = create_response.json()
  202. assert result["cost_per_kg"] is None
  203. await db_session.rollback()
  204. class TestCostCalculationScenarios:
  205. """End-to-end tests for various cost calculation scenarios."""
  206. @pytest.mark.asyncio
  207. @pytest.mark.integration
  208. async def test_cost_with_multiple_colors(self, async_client: AsyncClient, printer_factory, db_session):
  209. # Verify cost tracking works for multi-color prints.
  210. # Create two spools with different costs
  211. spool1_data = {
  212. "material": "ABS",
  213. "brand": "TestBrand",
  214. "label_weight": 1000,
  215. "core_weight": 250,
  216. "cost_per_kg": 20.00,
  217. }
  218. spool2_data = {
  219. "material": "PLA",
  220. "label_weight": 1000,
  221. "core_weight": 250,
  222. "cost_per_kg": 25.00,
  223. }
  224. spool1_response = await async_client.post("/api/v1/inventory/spools", json=spool1_data)
  225. spool2_response = await async_client.post("/api/v1/inventory/spools", json=spool2_data)
  226. assert spool1_response.status_code == 200
  227. assert spool2_response.status_code == 200
  228. # Verify spools created with correct costs
  229. assert spool1_response.json()["cost_per_kg"] == 20.00
  230. assert spool2_response.json()["cost_per_kg"] == 25.00
  231. await db_session.rollback()
  232. @pytest.mark.asyncio
  233. @pytest.mark.integration
  234. async def test_cost_precision(self, async_client: AsyncClient, db_session):
  235. # Verify cost calculations maintain proper precision.
  236. # Create spool with specific cost
  237. spool_data = {
  238. "material": "PLA",
  239. "brand": "TestBrand",
  240. "label_weight": 1000,
  241. "core_weight": 250,
  242. "cost_per_kg": 19.99, # Specific price
  243. }
  244. response = await async_client.post("/api/v1/inventory/spools", json=spool_data)
  245. assert response.status_code == 200
  246. result = response.json()
  247. # Verify precision is maintained
  248. assert result["cost_per_kg"] == 19.99
  249. await db_session.rollback()