|
@@ -75,6 +75,10 @@ def _make_archive(archive_id=1, file_path=None):
|
|
|
archive = MagicMock()
|
|
archive = MagicMock()
|
|
|
archive.id = archive_id
|
|
archive.id = archive_id
|
|
|
archive.file_path = file_path
|
|
archive.file_path = file_path
|
|
|
|
|
+ # Explicit numeric default so the #1344 top-up logic (archive_grams -
|
|
|
|
|
+ # tracked_grams) doesn't compare a MagicMock to a float. Tests that
|
|
|
|
|
+ # exercise the top-up path overwrite this with a real number.
|
|
|
|
|
+ archive.filament_used_grams = 0
|
|
|
return archive
|
|
return archive
|
|
|
|
|
|
|
|
|
|
|
|
@@ -689,6 +693,172 @@ class TestCostAggregation:
|
|
|
# Archive cost should have been updated
|
|
# Archive cost should have been updated
|
|
|
assert archive.cost == expected_cost
|
|
assert archive.cost == expected_cost
|
|
|
|
|
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_archive_cost_includes_untracked_filament_at_default_rate(self):
|
|
|
|
|
+ """#1344: when only some AMS trays have inventory spools, the untracked
|
|
|
|
|
+ filament weight is charged at the global default rate so the total
|
|
|
|
|
+ archive cost still reflects the whole print."""
|
|
|
|
|
+ spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=10.0)
|
|
|
|
|
+ assignment = _make_assignment(spool_id=1)
|
|
|
|
|
+ archive = _make_archive(archive_id=10)
|
|
|
|
|
+ archive.cost = None
|
|
|
|
|
+ archive.print_name = "TestPrint"
|
|
|
|
|
+ archive.printer_id = 1
|
|
|
|
|
+ archive.filament_used_grams = 110.0 # whole-print weight from slicer
|
|
|
|
|
+
|
|
|
|
|
+ _active_sessions[1] = PrintSession(
|
|
|
|
|
+ printer_id=1,
|
|
|
|
|
+ print_name="TestPrint",
|
|
|
|
|
+ started_at=datetime.now(timezone.utc),
|
|
|
|
|
+ tray_remain_start={(0, 0): 80},
|
|
|
|
|
+ tray_now_at_start=0,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ printer_manager = MagicMock()
|
|
|
|
|
+ printer_manager.get_status.return_value = SimpleNamespace(
|
|
|
|
|
+ raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
|
|
|
|
|
+ progress=100,
|
|
|
|
|
+ layer_num=50,
|
|
|
|
|
+ tray_now=0,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ responses = [
|
|
|
|
|
+ ("scalar_one_or_none", archive),
|
|
|
|
|
+ ("scalar_one_or_none", None), # queue item
|
|
|
|
|
+ ("scalar_one_or_none", assignment),
|
|
|
|
|
+ ("scalar_one_or_none", spool),
|
|
|
|
|
+ ("scalar_one_or_none", archive), # cost-update select
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ db = AsyncMock()
|
|
|
|
|
+ call_count = [0]
|
|
|
|
|
+
|
|
|
|
|
+ async def mock_execute(*args, **kwargs):
|
|
|
|
|
+ idx = call_count[0]
|
|
|
|
|
+ call_count[0] += 1
|
|
|
|
|
+ result = MagicMock()
|
|
|
|
|
+ if idx < len(responses):
|
|
|
|
|
+ _, value = responses[idx]
|
|
|
|
|
+ result.scalar.return_value = value
|
|
|
|
|
+ result.scalar_one_or_none.return_value = value
|
|
|
|
|
+ else:
|
|
|
|
|
+ result.scalar_one_or_none.return_value = None
|
|
|
|
|
+ result.scalar.return_value = None
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ db.execute = mock_execute
|
|
|
|
|
+
|
|
|
|
|
+ # 3MF reports a single slot using 10g, but archive.filament_used_grams
|
|
|
|
|
+ # says the whole print was 110g -- the other 100g came from spools that
|
|
|
|
|
+ # aren't in inventory.
|
|
|
|
|
+ filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
|
|
|
|
|
+
|
|
|
|
|
+ with (
|
|
|
|
|
+ patch("backend.app.core.config.settings") as mock_settings,
|
|
|
|
|
+ patch("backend.app.api.routes.settings.get_setting", return_value="10.0"),
|
|
|
|
|
+ patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
|
|
|
|
|
+ ):
|
|
|
|
|
+ mock_settings.base_dir = MagicMock()
|
|
|
|
|
+ mock_path = MagicMock()
|
|
|
|
|
+ mock_path.exists.return_value = True
|
|
|
|
|
+ mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
|
|
|
|
|
+
|
|
|
|
|
+ results = await on_print_complete(
|
|
|
|
|
+ printer_id=1,
|
|
|
|
|
+ data={"status": "completed"},
|
|
|
|
|
+ printer_manager=printer_manager,
|
|
|
|
|
+ db=db,
|
|
|
|
|
+ archive_id=10,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Tracked slot: 10g * $10/kg = $0.10
|
|
|
|
|
+ assert len(results) == 1
|
|
|
|
|
+ assert results[0]["cost"] == 0.10
|
|
|
|
|
+ # Untracked: 110g - 10g = 100g at $10/kg default = $1.00
|
|
|
|
|
+ # Archive total: $0.10 + $1.00 = $1.10 (was $0.01 pre-fix because only
|
|
|
|
|
+ # the tracked slot's tiny share was kept)
|
|
|
|
|
+ assert archive.cost == 1.10
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.mark.asyncio
|
|
|
|
|
+ async def test_archive_cost_fully_tracked_unchanged_by_topup(self):
|
|
|
|
|
+ """When every gram is covered by inventory spools, the default-rate
|
|
|
|
|
+ top-up adds nothing -- the archive cost is just the sum of tracked
|
|
|
|
|
+ costs, same as before #1344."""
|
|
|
|
|
+ spool = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
|
|
|
|
|
+ assignment = _make_assignment(spool_id=1)
|
|
|
|
|
+ archive = _make_archive(archive_id=10)
|
|
|
|
|
+ archive.cost = None
|
|
|
|
|
+ archive.print_name = "TestPrint"
|
|
|
|
|
+ archive.printer_id = 1
|
|
|
|
|
+ archive.filament_used_grams = 20.0 # exactly what the slot reports
|
|
|
|
|
+
|
|
|
|
|
+ _active_sessions[1] = PrintSession(
|
|
|
|
|
+ printer_id=1,
|
|
|
|
|
+ print_name="TestPrint",
|
|
|
|
|
+ started_at=datetime.now(timezone.utc),
|
|
|
|
|
+ tray_remain_start={(0, 0): 80},
|
|
|
|
|
+ tray_now_at_start=0,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ printer_manager = MagicMock()
|
|
|
|
|
+ printer_manager.get_status.return_value = SimpleNamespace(
|
|
|
|
|
+ raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
|
|
|
|
|
+ progress=100,
|
|
|
|
|
+ layer_num=50,
|
|
|
|
|
+ tray_now=0,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ responses = [
|
|
|
|
|
+ ("scalar_one_or_none", archive),
|
|
|
|
|
+ ("scalar_one_or_none", None),
|
|
|
|
|
+ ("scalar_one_or_none", assignment),
|
|
|
|
|
+ ("scalar_one_or_none", spool),
|
|
|
|
|
+ ("scalar_one_or_none", archive),
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ db = AsyncMock()
|
|
|
|
|
+ call_count = [0]
|
|
|
|
|
+
|
|
|
|
|
+ async def mock_execute(*args, **kwargs):
|
|
|
|
|
+ idx = call_count[0]
|
|
|
|
|
+ call_count[0] += 1
|
|
|
|
|
+ result = MagicMock()
|
|
|
|
|
+ if idx < len(responses):
|
|
|
|
|
+ _, value = responses[idx]
|
|
|
|
|
+ result.scalar.return_value = value
|
|
|
|
|
+ result.scalar_one_or_none.return_value = value
|
|
|
|
|
+ else:
|
|
|
|
|
+ result.scalar_one_or_none.return_value = None
|
|
|
|
|
+ result.scalar.return_value = None
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ db.execute = mock_execute
|
|
|
|
|
+
|
|
|
|
|
+ filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
|
|
|
|
|
+
|
|
|
|
|
+ with (
|
|
|
|
|
+ patch("backend.app.core.config.settings") as mock_settings,
|
|
|
|
|
+ patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
|
|
|
|
|
+ patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage),
|
|
|
|
|
+ ):
|
|
|
|
|
+ mock_settings.base_dir = MagicMock()
|
|
|
|
|
+ mock_path = MagicMock()
|
|
|
|
|
+ mock_path.exists.return_value = True
|
|
|
|
|
+ mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
|
|
|
|
|
+
|
|
|
|
|
+ results = await on_print_complete(
|
|
|
|
|
+ printer_id=1,
|
|
|
|
|
+ data={"status": "completed"},
|
|
|
|
|
+ printer_manager=printer_manager,
|
|
|
|
|
+ db=db,
|
|
|
|
|
+ archive_id=10,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 20g at $25/kg = $0.50 -- no top-up because tracked >= archive grams
|
|
|
|
|
+ assert len(results) == 1
|
|
|
|
|
+ assert results[0]["cost"] == 0.50
|
|
|
|
|
+ assert archive.cost == 0.50
|
|
|
|
|
+
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.asyncio
|
|
|
async def test_cost_with_archive_id(self):
|
|
async def test_cost_with_archive_id(self):
|
|
|
"""Test cost aggregation using archive_id (3MF path)."""
|
|
"""Test cost aggregation using archive_id (3MF path)."""
|