|
|
@@ -497,16 +497,13 @@ class TestCostAggregation:
|
|
|
"""Tests for cost aggregation to PrintArchive."""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
- async def test_costs_summed_in_archive(self):
|
|
|
- """Multiple spool costs are summed when aggregated to archive."""
|
|
|
- # This test would need to mock the full main.py flow
|
|
|
- # For now, we verify the results dict structure includes cost
|
|
|
+ async def test_costs_summed_in_results(self):
|
|
|
+ """Multiple spool costs are correctly summed from result dicts."""
|
|
|
results = [
|
|
|
{"spool_id": 1, "weight_used": 20.0, "cost": 0.50},
|
|
|
{"spool_id": 2, "weight_used": 30.0, "cost": 0.75},
|
|
|
]
|
|
|
|
|
|
- # Simulate aggregation logic from main.py
|
|
|
total_cost = sum(r.get("cost", 0) or 0 for r in results)
|
|
|
assert total_cost == 1.25
|
|
|
|
|
|
@@ -519,10 +516,184 @@ class TestCostAggregation:
|
|
|
{"spool_id": 3, "weight_used": 10.0, "cost": 0.25},
|
|
|
]
|
|
|
|
|
|
- # Aggregation should handle None gracefully
|
|
|
total_cost = sum(r.get("cost", 0) or 0 for r in results)
|
|
|
assert total_cost == 0.75 # Only spools 1 and 3
|
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ async def test_archive_cost_not_overwritten_with_zero(self):
|
|
|
+ """archive.cost is preserved when no spool usage has cost data."""
|
|
|
+ # Spool without cost_per_kg, default_filament_cost also 0 → cost=None per usage
|
|
|
+ spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=None)
|
|
|
+ assignment = _make_assignment(spool_id=1)
|
|
|
+ archive = _make_archive(archive_id=10)
|
|
|
+ archive.cost = 5.00 # Pre-existing cost from catalog
|
|
|
+ archive.print_name = "TestPrint"
|
|
|
+ archive.printer_id = 1
|
|
|
+
|
|
|
+ _active_sessions[1] = PrintSession(
|
|
|
+ printer_id=1,
|
|
|
+ print_name="TestPrint",
|
|
|
+ started_at=datetime.now(timezone.utc),
|
|
|
+ tray_remain_start={(0, 0): 80},
|
|
|
+ tray_now_at_start=0,
|
|
|
+ )
|
|
|
+
|
|
|
+ printer_manager = MagicMock()
|
|
|
+ printer_manager.get_status.return_value = SimpleNamespace(
|
|
|
+ raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
|
|
|
+ progress=100,
|
|
|
+ layer_num=50,
|
|
|
+ tray_now=0,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Build mock db that returns proper scalars for the aggregation queries
|
|
|
+ responses = []
|
|
|
+ # 1. select(PrintArchive) → archive
|
|
|
+ responses.append(("scalar_one_or_none", archive))
|
|
|
+ # 2. select(PrintQueueItem) → None
|
|
|
+ responses.append(("scalar_one_or_none", None))
|
|
|
+ # 3. select(SpoolAssignment) → assignment
|
|
|
+ responses.append(("scalar_one_or_none", assignment))
|
|
|
+ # 4. select(Spool) → spool
|
|
|
+ responses.append(("scalar_one_or_none", spool))
|
|
|
+ # 5. cost aggregation: coalesce(sum(cost)) → 0 (no costs)
|
|
|
+ responses.append(("scalar", 0))
|
|
|
+ # 6. select(PrintArchive) → archive (for the guard check)
|
|
|
+ responses.append(("scalar_one_or_none", archive))
|
|
|
+ # 7. legacy fallback: coalesce(sum(cost)) → 0
|
|
|
+ responses.append(("scalar", 0))
|
|
|
+
|
|
|
+ db = AsyncMock()
|
|
|
+ call_count = [0]
|
|
|
+
|
|
|
+ async def mock_execute(*args, **kwargs):
|
|
|
+ idx = call_count[0]
|
|
|
+ call_count[0] += 1
|
|
|
+ result = MagicMock()
|
|
|
+ if idx < len(responses):
|
|
|
+ method, value = responses[idx]
|
|
|
+ if method == "scalar":
|
|
|
+ result.scalar.return_value = value
|
|
|
+ result.scalar_one_or_none.return_value = value
|
|
|
+ else:
|
|
|
+ result.scalar_one_or_none.return_value = value
|
|
|
+ result.scalar.return_value = value
|
|
|
+ else:
|
|
|
+ result.scalar_one_or_none.return_value = None
|
|
|
+ result.scalar.return_value = None
|
|
|
+ return result
|
|
|
+
|
|
|
+ db.execute = mock_execute
|
|
|
+
|
|
|
+ filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch("backend.app.core.config.settings") as mock_settings,
|
|
|
+ patch("backend.app.api.routes.settings.get_setting", return_value="0.0"), # no default cost
|
|
|
+ patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
|
|
|
+ ):
|
|
|
+ mock_settings.base_dir = MagicMock()
|
|
|
+ mock_path = MagicMock()
|
|
|
+ mock_path.exists.return_value = True
|
|
|
+ mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
|
|
|
+
|
|
|
+ results = await on_print_complete(
|
|
|
+ printer_id=1,
|
|
|
+ data={"status": "completed"},
|
|
|
+ printer_manager=printer_manager,
|
|
|
+ db=db,
|
|
|
+ archive_id=10,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Usage tracked but cost is None (no cost_per_kg, no default)
|
|
|
+ assert len(results) == 1
|
|
|
+ assert results[0]["cost"] is None
|
|
|
+
|
|
|
+ # Archive cost should NOT have been overwritten — still 5.00
|
|
|
+ assert archive.cost == 5.00
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ async def test_archive_cost_set_when_spool_has_cost(self):
|
|
|
+ """archive.cost is set from spool usage when cost data exists."""
|
|
|
+ spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
|
|
|
+ assignment = _make_assignment(spool_id=1)
|
|
|
+ archive = _make_archive(archive_id=10)
|
|
|
+ archive.cost = None # No pre-existing cost
|
|
|
+ archive.print_name = "TestPrint"
|
|
|
+ archive.printer_id = 1
|
|
|
+
|
|
|
+ _active_sessions[1] = PrintSession(
|
|
|
+ printer_id=1,
|
|
|
+ print_name="TestPrint",
|
|
|
+ started_at=datetime.now(timezone.utc),
|
|
|
+ tray_remain_start={(0, 0): 80},
|
|
|
+ tray_now_at_start=0,
|
|
|
+ )
|
|
|
+
|
|
|
+ printer_manager = MagicMock()
|
|
|
+ printer_manager.get_status.return_value = SimpleNamespace(
|
|
|
+ raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
|
|
|
+ progress=100,
|
|
|
+ layer_num=50,
|
|
|
+ tray_now=0,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 20g at 25/kg = 0.50
|
|
|
+ expected_cost = 0.50
|
|
|
+
|
|
|
+ responses = []
|
|
|
+ responses.append(("scalar_one_or_none", archive))
|
|
|
+ responses.append(("scalar_one_or_none", None)) # queue item
|
|
|
+ responses.append(("scalar_one_or_none", assignment))
|
|
|
+ responses.append(("scalar_one_or_none", spool))
|
|
|
+ # cost aggregation: sum returns 0.50
|
|
|
+ responses.append(("scalar", expected_cost))
|
|
|
+ # select archive for guard
|
|
|
+ responses.append(("scalar_one_or_none", archive))
|
|
|
+
|
|
|
+ db = AsyncMock()
|
|
|
+ call_count = [0]
|
|
|
+
|
|
|
+ async def mock_execute(*args, **kwargs):
|
|
|
+ idx = call_count[0]
|
|
|
+ call_count[0] += 1
|
|
|
+ result = MagicMock()
|
|
|
+ if idx < len(responses):
|
|
|
+ method, value = responses[idx]
|
|
|
+ result.scalar.return_value = value
|
|
|
+ result.scalar_one_or_none.return_value = value
|
|
|
+ else:
|
|
|
+ result.scalar_one_or_none.return_value = None
|
|
|
+ result.scalar.return_value = None
|
|
|
+ return result
|
|
|
+
|
|
|
+ db.execute = mock_execute
|
|
|
+
|
|
|
+ filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch("backend.app.core.config.settings") as mock_settings,
|
|
|
+ patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
|
|
|
+ patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
|
|
|
+ ):
|
|
|
+ mock_settings.base_dir = MagicMock()
|
|
|
+ mock_path = MagicMock()
|
|
|
+ mock_path.exists.return_value = True
|
|
|
+ mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
|
|
|
+
|
|
|
+ results = await on_print_complete(
|
|
|
+ printer_id=1,
|
|
|
+ data={"status": "completed"},
|
|
|
+ printer_manager=printer_manager,
|
|
|
+ db=db,
|
|
|
+ archive_id=10,
|
|
|
+ )
|
|
|
+
|
|
|
+ assert len(results) == 1
|
|
|
+ assert results[0]["cost"] == expected_cost
|
|
|
+ # Archive cost should have been updated
|
|
|
+ assert archive.cost == expected_cost
|
|
|
+
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_cost_with_archive_id(self):
|
|
|
"""Test cost aggregation using archive_id (3MF path)."""
|