"""Unit tests for the filament-deficit pre-dispatch check (#1496). The check is the single source of truth that both ``POST /queue/{id}/start`` and the dispatch scheduler call before sending a print to the printer. Pin the contract for the cases that matter: * Internal-inventory mode: shortfall + sufficient + no assignment. * AMS-mapping gating: a missing mapping means "not yet decided, skip". * Disabled-warnings setting + missing printer (model-based item) + no source 3MF all short-circuit to "no deficit". """ from __future__ import annotations import json import zipfile from pathlib import Path from unittest.mock import patch import pytest from backend.app.models.archive import PrintArchive from backend.app.models.print_queue import PrintQueueItem from backend.app.models.settings import Settings from backend.app.models.spool import Spool from backend.app.models.spool_assignment import SpoolAssignment from backend.app.services.filament_deficit import ( FilamentDeficit, compute_deficit_for_queue_item, ) def _write_3mf(file_path: Path, filaments: list[dict]) -> None: """Minimal 3MF that ``extract_filament_requirements`` can parse (flat shape).""" body = "".join( f'' for f in filaments ) config = f'{body}' with zipfile.ZipFile(file_path, "w") as zf: zf.writestr("Metadata/slice_info.config", config) async def _setup_archive_3mf(db_session, tmp_path: Path, filaments: list[dict]) -> PrintArchive: """Create a 3MF on disk and a PrintArchive row pointing at it.""" file_name = "model.3mf" file_path = tmp_path / file_name _write_3mf(file_path, filaments) archive = PrintArchive( filename=file_name, print_name="Test", # The helper resolves via app_settings.base_dir / file_path, but # storing the absolute path on the model also works because # ``Path / abs`` collapses to the absolute side. file_path=str(file_path), file_size=file_path.stat().st_size, status="completed", ) db_session.add(archive) await db_session.commit() await db_session.refresh(archive) return archive async def _spool(db_session, *, label_weight: int, weight_used: float, color: str = "#000000") -> Spool: spool = Spool( material="PLA", label_weight=label_weight, weight_used=weight_used, rgba=color, ) db_session.add(spool) await db_session.commit() await db_session.refresh(spool) return spool async def _assign(db_session, *, printer_id: int, spool_id: int, ams_id: int = 0, tray_id: int = 0) -> None: db_session.add( SpoolAssignment( spool_id=spool_id, printer_id=printer_id, ams_id=ams_id, tray_id=tray_id, ) ) await db_session.commit() async def _queue_item( db_session, *, printer_id: int | None, archive: PrintArchive | None, ams_mapping: list[int] | None, plate_id: int | None = None, ) -> PrintQueueItem: item = PrintQueueItem( printer_id=printer_id, archive_id=archive.id if archive else None, ams_mapping=json.dumps(ams_mapping) if ams_mapping is not None else None, plate_id=plate_id, status="pending", manual_start=True, ) db_session.add(item) await db_session.commit() await db_session.refresh(item, ["archive", "library_file"]) return item class TestFilamentDeficit: @pytest.mark.asyncio async def test_returns_deficit_when_spool_too_light(self, db_session, printer_factory, tmp_path): """Spool with 30g remaining for a 100g print → one deficit row.""" printer = await printer_factory() archive = await _setup_archive_3mf( db_session, tmp_path, [{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "100.0"}], ) spool = await _spool(db_session, label_weight=1000, weight_used=970.0) # 30g left await _assign(db_session, printer_id=printer.id, spool_id=spool.id, ams_id=0, tray_id=0) item = await _queue_item(db_session, printer_id=printer.id, archive=archive, ams_mapping=[0]) with patch("backend.app.services.filament_deficit.app_settings.base_dir", Path("/")): deficit = await compute_deficit_for_queue_item(db_session, item) assert len(deficit) == 1 assert isinstance(deficit[0], FilamentDeficit) assert deficit[0].slot_id == 1 assert deficit[0].required_grams == 100.0 assert deficit[0].remaining_grams == 30.0 assert deficit[0].filament_type == "PLA" @pytest.mark.asyncio async def test_returns_empty_when_spool_has_enough(self, db_session, printer_factory, tmp_path): printer = await printer_factory() archive = await _setup_archive_3mf( db_session, tmp_path, [{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "100.0"}], ) spool = await _spool(db_session, label_weight=1000, weight_used=200.0) # 800g left await _assign(db_session, printer_id=printer.id, spool_id=spool.id, ams_id=0, tray_id=0) item = await _queue_item(db_session, printer_id=printer.id, archive=archive, ams_mapping=[0]) with patch("backend.app.services.filament_deficit.app_settings.base_dir", Path("/")): deficit = await compute_deficit_for_queue_item(db_session, item) assert deficit == [] @pytest.mark.asyncio async def test_returns_empty_when_ams_mapping_missing(self, db_session, printer_factory, tmp_path): """No mapping yet = scheduler hasn't decided which slot maps where.""" printer = await printer_factory() archive = await _setup_archive_3mf( db_session, tmp_path, [{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "100.0"}], ) item = await _queue_item(db_session, printer_id=printer.id, archive=archive, ams_mapping=None) with patch("backend.app.services.filament_deficit.app_settings.base_dir", Path("/")): deficit = await compute_deficit_for_queue_item(db_session, item) assert deficit == [] @pytest.mark.asyncio async def test_returns_empty_when_no_printer_assigned(self, db_session, tmp_path): """Model-based queue items with no resolved printer_id can't be checked.""" archive = await _setup_archive_3mf( db_session, tmp_path, [{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "100.0"}], ) item = await _queue_item(db_session, printer_id=None, archive=archive, ams_mapping=[0]) with patch("backend.app.services.filament_deficit.app_settings.base_dir", Path("/")): deficit = await compute_deficit_for_queue_item(db_session, item) assert deficit == [] @pytest.mark.asyncio async def test_returns_empty_when_warnings_disabled(self, db_session, printer_factory, tmp_path): """Honour the disable_filament_warnings setting (#720 toggle).""" printer = await printer_factory() archive = await _setup_archive_3mf( db_session, tmp_path, [{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "100.0"}], ) spool = await _spool(db_session, label_weight=1000, weight_used=970.0) await _assign(db_session, printer_id=printer.id, spool_id=spool.id) item = await _queue_item(db_session, printer_id=printer.id, archive=archive, ams_mapping=[0]) db_session.add(Settings(key="disable_filament_warnings", value="true")) await db_session.commit() with patch("backend.app.services.filament_deficit.app_settings.base_dir", Path("/")): deficit = await compute_deficit_for_queue_item(db_session, item) assert deficit == [] @pytest.mark.asyncio async def test_returns_empty_when_no_assignment(self, db_session, printer_factory, tmp_path): """Mapping points at a slot with no spool assigned → silent, not blocked.""" printer = await printer_factory() archive = await _setup_archive_3mf( db_session, tmp_path, [{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "100.0"}], ) item = await _queue_item(db_session, printer_id=printer.id, archive=archive, ams_mapping=[0]) with patch("backend.app.services.filament_deficit.app_settings.base_dir", Path("/")): deficit = await compute_deficit_for_queue_item(db_session, item) assert deficit == [] @pytest.mark.asyncio async def test_returns_empty_when_3mf_missing(self, db_session, printer_factory): printer = await printer_factory() archive = PrintArchive( filename="ghost.3mf", file_path="/nonexistent/ghost.3mf", file_size=0, status="completed", ) db_session.add(archive) await db_session.commit() await db_session.refresh(archive) item = await _queue_item(db_session, printer_id=printer.id, archive=archive, ams_mapping=[0]) deficit = await compute_deficit_for_queue_item(db_session, item) assert deficit == [] @pytest.mark.asyncio async def test_multi_slot_only_shorted_slot_returned(self, db_session, printer_factory, tmp_path): """One slot fine, one short — only the short slot is in the result.""" printer = await printer_factory() archive = await _setup_archive_3mf( db_session, tmp_path, [ {"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "100.0"}, {"id": "2", "type": "PETG", "color": "#000000", "used_g": "80.0"}, ], ) plenty = await _spool(db_session, label_weight=1000, weight_used=100.0) # 900g shorted = await _spool(db_session, label_weight=1000, weight_used=950.0) # 50g await _assign(db_session, printer_id=printer.id, spool_id=plenty.id, ams_id=0, tray_id=0) await _assign(db_session, printer_id=printer.id, spool_id=shorted.id, ams_id=0, tray_id=1) item = await _queue_item( db_session, printer_id=printer.id, archive=archive, ams_mapping=[0, 1], # slot 1 -> tray 0, slot 2 -> tray 1 ) with patch("backend.app.services.filament_deficit.app_settings.base_dir", Path("/")): deficit = await compute_deficit_for_queue_item(db_session, item) assert [d.slot_id for d in deficit] == [2] assert deficit[0].remaining_grams == 50.0 assert deficit[0].required_grams == 80.0