|
|
@@ -9,6 +9,8 @@ Tests cost calculation scenarios:
|
|
|
- Cost aggregation to archives
|
|
|
"""
|
|
|
|
|
|
+import os
|
|
|
+import tempfile
|
|
|
from datetime import datetime, timezone
|
|
|
from types import SimpleNamespace
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
@@ -45,14 +47,84 @@ def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
|
|
|
return assignment
|
|
|
|
|
|
|
|
|
-def _make_archive(archive_id=1, file_path="archives/1/test.3mf"):
|
|
|
- """Create a mock PrintArchive object."""
|
|
|
+def _make_archive(archive_id=1, file_path=None):
|
|
|
+ """Create a mock PrintArchive object with a temp file, and register cleanup."""
|
|
|
+ if file_path is None:
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf", prefix="test_print_") as tmp:
|
|
|
+ file_path = tmp.name
|
|
|
+ # Register cleanup for this file after the test
|
|
|
+ import pytest
|
|
|
+
|
|
|
+ frame = None
|
|
|
+ try:
|
|
|
+ raise Exception
|
|
|
+ except Exception:
|
|
|
+ import sys
|
|
|
+
|
|
|
+ frame = sys._getframe(1)
|
|
|
+ request = frame.f_locals.get("request")
|
|
|
+ if request is not None:
|
|
|
+
|
|
|
+ def cleanup():
|
|
|
+ try:
|
|
|
+ os.remove(file_path)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ request.addfinalizer(cleanup)
|
|
|
archive = MagicMock()
|
|
|
archive.id = archive_id
|
|
|
archive.file_path = file_path
|
|
|
return archive
|
|
|
|
|
|
|
|
|
+@pytest.fixture(autouse=True)
|
|
|
+def cleanup_temp_archives():
|
|
|
+ yield
|
|
|
+ # Cleanup any temp .3mf files created by _make_archive
|
|
|
+ import glob
|
|
|
+
|
|
|
+ for f in glob.glob("test_print_*.3mf"):
|
|
|
+ try:
|
|
|
+ os.remove(f)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture(autouse=True)
|
|
|
+def cleanup_test_print_gcode():
|
|
|
+ yield
|
|
|
+ import os
|
|
|
+
|
|
|
+ path = "archives/test/test_print.gcode.3mf"
|
|
|
+ if os.path.exists(path):
|
|
|
+ try:
|
|
|
+ os.remove(path)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture
|
|
|
+def archive_factory_temp():
|
|
|
+ import tempfile
|
|
|
+
|
|
|
+ def _factory(*args, **kwargs):
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".3mf", prefix="test_print_", dir="archives/test") as tmp:
|
|
|
+ kwargs["file_path"] = tmp.name
|
|
|
+ return kwargs["file_path"]
|
|
|
+
|
|
|
+ yield _factory
|
|
|
+ # Cleanup
|
|
|
+ import glob
|
|
|
+ import os
|
|
|
+
|
|
|
+ for f in glob.glob("archives/test/test_print_*.3mf"):
|
|
|
+ try:
|
|
|
+ os.remove(f)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
def _mock_db_sequential(responses):
|
|
|
"""Create mock db that returns responses in order."""
|
|
|
db = AsyncMock()
|
|
|
@@ -450,3 +522,90 @@ class TestCostAggregation:
|
|
|
# Aggregation should handle None gracefully
|
|
|
total_cost = sum(r.get("cost", 0) or 0 for r in results)
|
|
|
assert total_cost == 0.75 # Only spools 1 and 3
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ async def test_cost_with_archive_id(self):
|
|
|
+ """Test cost aggregation using archive_id (3MF path)."""
|
|
|
+ spool_new = _make_spool(spool_id=1, label_weight=1000, cost_per_kg=25.0)
|
|
|
+ assignment_new = _make_assignment(spool_id=1)
|
|
|
+ archive_new = _make_archive(archive_id=20)
|
|
|
+ filament_usage_new = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": "#FF0000"}]
|
|
|
+
|
|
|
+ printer_manager = MagicMock()
|
|
|
+ printer_manager.get_status.return_value = SimpleNamespace(
|
|
|
+ raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
|
|
|
+ progress=100,
|
|
|
+ layer_num=50,
|
|
|
+ tray_now=0,
|
|
|
+ )
|
|
|
+
|
|
|
+ db = _mock_db_sequential([archive_new, None, assignment_new, spool_new])
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch("backend.app.core.config.settings") as mock_settings,
|
|
|
+ patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
|
|
|
+ patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=filament_usage_new),
|
|
|
+ ):
|
|
|
+ mock_settings.base_dir = MagicMock()
|
|
|
+ mock_path = MagicMock()
|
|
|
+ mock_path.exists.return_value = True
|
|
|
+ mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
|
|
|
+
|
|
|
+ results_new = await on_print_complete(
|
|
|
+ printer_id=1,
|
|
|
+ data={"status": "completed"},
|
|
|
+ printer_manager=printer_manager,
|
|
|
+ db=db,
|
|
|
+ archive_id=20,
|
|
|
+ )
|
|
|
+
|
|
|
+ assert len(results_new) == 1
|
|
|
+ assert results_new[0]["spool_id"] == 1
|
|
|
+ assert results_new[0]["cost"] == 0.50 # 20g / 1000 * 25.0
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ async def test_cost_with_print_name_ams_fallback(self):
|
|
|
+ """Test cost aggregation using print_name (AMS fallback, legacy path)."""
|
|
|
+ spool_old = _make_spool(spool_id=2, label_weight=1000, cost_per_kg=15.0)
|
|
|
+ assignment_old = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
|
|
|
+ legacy_print_name = "LegacyPrint"
|
|
|
+
|
|
|
+ _active_sessions[1] = PrintSession(
|
|
|
+ printer_id=1,
|
|
|
+ print_name=legacy_print_name,
|
|
|
+ started_at=datetime.now(timezone.utc),
|
|
|
+ tray_remain_start={(0, 0): 80},
|
|
|
+ tray_now_at_start=0,
|
|
|
+ )
|
|
|
+
|
|
|
+ printer_manager = MagicMock()
|
|
|
+ printer_manager.get_status.return_value = SimpleNamespace(
|
|
|
+ raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
|
|
|
+ progress=100,
|
|
|
+ layer_num=50,
|
|
|
+ tray_now=0,
|
|
|
+ )
|
|
|
+
|
|
|
+ db = _mock_db_sequential([assignment_old, spool_old])
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch("backend.app.core.config.settings") as mock_settings,
|
|
|
+ patch("backend.app.api.routes.settings.get_setting", return_value="15.0"),
|
|
|
+ patch("backend.app.utils.threemf_tools.extract_filament_usage_from_3mf", return_value=None),
|
|
|
+ ):
|
|
|
+ mock_settings.base_dir = MagicMock()
|
|
|
+ mock_path = MagicMock()
|
|
|
+ mock_path.exists.return_value = True
|
|
|
+ mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
|
|
|
+
|
|
|
+ results_old = await on_print_complete(
|
|
|
+ printer_id=1,
|
|
|
+ data={"status": "completed", "subtask_name": legacy_print_name, "filename": legacy_print_name},
|
|
|
+ printer_manager=printer_manager,
|
|
|
+ db=db,
|
|
|
+ archive_id=None,
|
|
|
+ )
|
|
|
+
|
|
|
+ assert len(results_old) == 1
|
|
|
+ assert results_old[0]["spool_id"] == 2
|
|
|
+ assert results_old[0]["cost"] == 1.5 # 100g / 1000 * 15.0
|