import pytest from httpx import AsyncClient from sqlalchemy import select from backend.app.models.archive import PrintArchive from backend.app.models.spool import Spool from backend.app.models.spool_assignment import SpoolAssignment from backend.app.models.spool_usage_history import SpoolUsageHistory @pytest.fixture(autouse=True) def cleanup_test_archive_files(): yield import glob import os # Remove any test archive files created in archives/test/ for f in glob.glob("archives/test/test_print*.3mf"): try: os.remove(f) except Exception: pass """Integration tests for cost tracking in archives and statistics. Tests the full flow of cost tracking from usage to statistics: - Archive cost field populated correctly - Statistics endpoint aggregates costs - Completed vs failed prints cost handling """ class TestArchiveCostTracking: """Tests for cost field in PrintArchive.""" @pytest.mark.asyncio @pytest.mark.integration async def test_archive_has_cost_field( self, async_client: AsyncClient, archive_factory, printer_factory, db_session ): # Verify PrintArchive includes cost field in response. printer = await printer_factory() archive = await archive_factory( printer.id, print_name="Test Archive", status="completed", cost=5.50, # Set a cost ) response = await async_client.get(f"/api/v1/archives/{archive.id}") assert response.status_code == 200 result = response.json() assert "cost" in result assert result["cost"] == 5.50 await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_archive_cost_null_when_not_set( self, async_client: AsyncClient, archive_factory, printer_factory, db_session ): # Verify cost is null when not set. printer = await printer_factory() archive = await archive_factory( printer.id, print_name="Test Archive", status="completed", # cost not set ) response = await async_client.get(f"/api/v1/archives/{archive.id}") assert response.status_code == 200 result = response.json() assert result["cost"] is None or result["cost"] == 0 await db_session.rollback() class TestStatisticsCostAggregation: """Tests for cost aggregation in statistics endpoint.""" @pytest.mark.asyncio @pytest.mark.integration async def test_statistics_includes_total_cost( self, async_client: AsyncClient, archive_factory, printer_factory, db_session ): # Verify statistics endpoint includes total_cost field. printer = await printer_factory() # Create archives with costs await archive_factory( printer.id, status="completed", cost=2.50, filament_used_grams=100.0, ) await archive_factory( printer.id, status="completed", cost=3.75, filament_used_grams=150.0, ) response = await async_client.get("/api/v1/archives/stats") assert response.status_code == 200 result = response.json() assert "total_cost" in result assert result["total_cost"] == 6.25 await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_statistics_aggregates_costs_correctly( self, async_client: AsyncClient, archive_factory, printer_factory, db_session ): # Verify statistics correctly sums costs from all archives. printer = await printer_factory() # Create multiple archives with different costs costs = [1.25, 2.50, 0.75, 5.00, 0.50] for cost in costs: await archive_factory( printer.id, status="completed", cost=cost, filament_used_grams=50.0, ) response = await async_client.get("/api/v1/archives/stats") assert response.status_code == 200 result = response.json() expected_total = sum(costs) assert result["total_cost"] == expected_total await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_statistics_handles_null_costs( self, async_client: AsyncClient, archive_factory, printer_factory, db_session ): # Verify statistics handles archives with null costs gracefully. printer = await printer_factory() # Mix of archives with and without costs await archive_factory(printer.id, status="completed", cost=2.50) await archive_factory(printer.id, status="completed", cost=None) await archive_factory(printer.id, status="completed", cost=1.75) await archive_factory(printer.id, status="completed") # No cost field response = await async_client.get("/api/v1/archives/stats") assert response.status_code == 200 result = response.json() # Should sum only non-null costs assert result["total_cost"] == 4.25 await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_statistics_includes_failed_print_costs( self, async_client: AsyncClient, archive_factory, printer_factory, db_session ): # Verify failed prints with costs are included in statistics. printer = await printer_factory() await archive_factory(printer.id, status="completed", cost=5.00) await archive_factory(printer.id, status="failed", cost=2.50) # Failed but has cost await archive_factory(printer.id, status="cancelled", cost=1.00) response = await async_client.get("/api/v1/archives/stats") assert response.status_code == 200 result = response.json() # All prints should contribute to total cost assert result["total_cost"] == 8.50 await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_statistics_zero_cost_when_no_archives(self, async_client: AsyncClient): """Verify total_cost is 0 when no archives exist.""" response = await async_client.get("/api/v1/archives/stats") assert response.status_code == 200 result = response.json() assert result["total_cost"] == 0.0 class TestSpoolCostPersistence: """Tests for spool cost_per_kg field.""" @pytest.mark.asyncio @pytest.mark.integration async def test_spool_cost_fields_persist(self, async_client: AsyncClient, db_session): # Verify cost_per_kg is saved and retrieved. # Create a spool with cost spool_data = { "material": "PLA", "brand": "TestBrand", "label_weight": 1000, "core_weight": 250, "cost_per_kg": 25.50, } create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data) assert create_response.status_code == 200 spool_id = create_response.json()["id"] # Retrieve and verify get_response = await async_client.get(f"/api/v1/inventory/spools/{spool_id}") assert get_response.status_code == 200 result = get_response.json() assert result["cost_per_kg"] == 25.50 await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_spool_update_cost_fields(self, async_client: AsyncClient, db_session): # Verify cost fields can be updated. # Create spool without cost spool_data = { "material": "PETG", "brand": "TestBrand", "label_weight": 1000, "core_weight": 250, } create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data) assert create_response.status_code == 200 spool_id = create_response.json()["id"] # Update with cost update_data = { "cost_per_kg": 30.00, } update_response = await async_client.patch(f"/api/v1/inventory/spools/{spool_id}", json=update_data) assert update_response.status_code == 200 result = update_response.json() assert result["cost_per_kg"] == 30.00 await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_spool_cost_null_by_default(self, async_client: AsyncClient, db_session): # Verify cost_per_kg defaults to null when not provided. spool_data = { "material": "ABS", "label_weight": 1000, "core_weight": 250, } create_response = await async_client.post("/api/v1/inventory/spools", json=spool_data) assert create_response.status_code == 200 result = create_response.json() assert result["cost_per_kg"] is None await db_session.rollback() class TestCostCalculationScenarios: """End-to-end tests for various cost calculation scenarios.""" @pytest.mark.asyncio @pytest.mark.integration async def test_cost_with_multiple_colors(self, async_client: AsyncClient, printer_factory, db_session): # Verify cost tracking works for multi-color prints. # Create two spools with different costs spool1_data = { "material": "ABS", "brand": "TestBrand", "label_weight": 1000, "core_weight": 250, "cost_per_kg": 20.00, } spool2_data = { "material": "PLA", "label_weight": 1000, "core_weight": 250, "cost_per_kg": 25.00, } spool1_response = await async_client.post("/api/v1/inventory/spools", json=spool1_data) spool2_response = await async_client.post("/api/v1/inventory/spools", json=spool2_data) assert spool1_response.status_code == 200 assert spool2_response.status_code == 200 # Verify spools created with correct costs assert spool1_response.json()["cost_per_kg"] == 20.00 assert spool2_response.json()["cost_per_kg"] == 25.00 await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_cost_precision(self, async_client: AsyncClient, db_session): # Verify cost calculations maintain proper precision. # Create spool with specific cost spool_data = { "material": "PLA", "brand": "TestBrand", "label_weight": 1000, "core_weight": 250, "cost_per_kg": 19.99, # Specific price } response = await async_client.post("/api/v1/inventory/spools", json=spool_data) assert response.status_code == 200 result = response.json() # Verify precision is maintained assert result["cost_per_kg"] == 19.99 await db_session.rollback() @pytest.mark.asyncio @pytest.mark.integration async def test_archive_cost_with_archive_id_and_print_name( self, async_client, archive_factory, printer_factory, db_session ): """Test archive cost recalculation using both archive_id and print_name fallback.""" from backend.app.models.spool import Spool from backend.app.models.spool_usage_history import SpoolUsageHistory printer = await printer_factory() # Create spools and commit spool_new = Spool( material="PLA", brand="BrandA", label_weight=1000, core_weight=250, cost_per_kg=20.0, ) spool_old = Spool( material="ABS", brand="BrandB", label_weight=1000, core_weight=250, cost_per_kg=15.0, ) db_session.add_all([spool_new, spool_old]) await db_session.commit() await db_session.refresh(spool_new) await db_session.refresh(spool_old) # Create archive with new SpoolUsageHistory (archive_id set). # filament_used_grams matches the tracked weight so the #1344 default- # rate top-up for untracked filament doesn't apply -- this test pins # the query routing, not the top-up branch. archive_new = await archive_factory( printer.id, print_name="UniquePrint", status="completed", cost=None, filament_used_grams=20.0, ) history_new = SpoolUsageHistory( spool_id=spool_new.id, printer_id=printer.id, print_name="UniquePrint", weight_used=20.0, percent_used=20, status="completed", cost=0.50, archive_id=archive_new.id, ) db_session.add(history_new) # Create archive with old SpoolUsageHistory (archive_id NULL — legacy record) archive_old = await archive_factory( printer.id, print_name="LegacyPrint", status="completed", cost=None, ) archive_old.filament_used_grams = 30.0 await db_session.commit() history_old = SpoolUsageHistory( spool_id=spool_old.id, printer_id=printer.id, print_name="LegacyPrint", weight_used=30.0, percent_used=30, status="completed", cost=0.45, archive_id=None, ) db_session.add(history_old) await db_session.commit() # Recalculate costs for all archives recalc_response = await async_client.post("/api/v1/archives/recalculate-costs") assert recalc_response.status_code == 200 assert recalc_response.json()["updated"] >= 1 # Verify archive_new cost from archive_id-linked SpoolUsageHistory response_new = await async_client.get(f"/api/v1/archives/{archive_new.id}") assert response_new.status_code == 200 assert response_new.json()["cost"] == 0.50 # Verify archive_old cost from legacy print_name fallback response_old = await async_client.get(f"/api/v1/archives/{archive_old.id}") assert response_old.status_code == 200 assert response_old.json()["cost"] == 0.45 await db_session.rollback()