"""Unit tests for the preview-slice cache. The preview-slice runs the sidecar's `slice_without_profiles` on an unsliced project file to extract the per-plate filament list. Results are cached by ``(kind, source_id, plate_id, content_hash)`` with LRU eviction so repeat modal opens on the same plate are instant. """ from __future__ import annotations import asyncio import io import zipfile from typing import Any from unittest.mock import patch import pytest from backend.app.services import slice_preview from backend.app.services.slice_preview import ( _PREVIEW_CACHE_MAX, _parse_filaments_from_sliced_3mf, get_preview_filaments, ) from backend.app.services.slicer_api import ( SlicerApiUnavailableError, SliceResult, ) def _make_sliced_3mf(plate_id: int, filaments: list[dict[str, str]]) -> bytes: """Build a fake sliced-3MF zip whose Metadata/slice_info.config has one plate matching ``plate_id`` with the given filament rows.""" fil_xml = "".join( f'' for f in filaments ) slice_info = ( f'{fil_xml}' ) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("Metadata/slice_info.config", slice_info) return buf.getvalue() @pytest.fixture(autouse=True) def _reset_cache(): """Each test gets an empty cache + lock dict to keep them independent.""" slice_preview._preview_cache.clear() slice_preview._preview_locks.clear() yield slice_preview._preview_cache.clear() slice_preview._preview_locks.clear() class _StubService: """Mimics SlicerApiService just enough for these tests. Records every `slice_without_profiles` call so we can assert call counts.""" def __init__(self, response_bytes: bytes | None = None, raise_exc: BaseException | None = None) -> None: self.response_bytes = response_bytes self.raise_exc = raise_exc self.calls: list[dict[str, Any]] = [] async def __aenter__(self): return self async def __aexit__(self, *exc): return False async def slice_without_profiles(self, **kw): self.calls.append(kw) if self.raise_exc is not None: raise self.raise_exc return SliceResult( content=self.response_bytes or b"", print_time_seconds=0, filament_used_g=0.0, filament_used_mm=0.0, ) # --------------------------------------------------------------------------- # _parse_filaments_from_sliced_3mf — pure-function parsing tests. # --------------------------------------------------------------------------- class TestParseFilamentsFromSliced3mf: def test_happy_path(self): body = _make_sliced_3mf( plate_id=22, filaments=[ {"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "33.9"}, {"id": "6", "type": "PLA", "color": "#FF0000", "used_g": "37.7"}, ], ) result = _parse_filaments_from_sliced_3mf(body, 22) assert result is not None assert [(f["slot_id"], f["color"]) for f in result] == [(1, "#FFFFFF"), (6, "#FF0000")] assert result[0]["used_grams"] == 33.9 def test_missing_slice_info_returns_none(self): empty_zip = io.BytesIO() with zipfile.ZipFile(empty_zip, "w") as zf: zf.writestr("placeholder.txt", "x") assert _parse_filaments_from_sliced_3mf(empty_zip.getvalue(), 1) is None def test_plate_not_in_slice_info_returns_none(self): body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}]) assert _parse_filaments_from_sliced_3mf(body, plate_id=99) is None def test_corrupt_zip_returns_none(self): assert _parse_filaments_from_sliced_3mf(b"not a zip file", 1) is None # --------------------------------------------------------------------------- # get_preview_filaments — cache + concurrency behaviour. # --------------------------------------------------------------------------- class TestGetPreviewFilaments: @pytest.mark.asyncio async def test_happy_path_caches_result(self): body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}]) stub = _StubService(response_bytes=body) with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub): first = await get_preview_filaments( kind="archive", source_id=1, plate_id=1, file_bytes=b"abc", file_name="x.3mf", api_url="http://sidecar", ) second = await get_preview_filaments( kind="archive", source_id=1, plate_id=1, file_bytes=b"abc", file_name="x.3mf", api_url="http://sidecar", ) assert first is not None assert first[0]["slot_id"] == 1 assert second == first # Cache hit — only one slice was actually run. assert len(stub.calls) == 1 @pytest.mark.asyncio async def test_different_content_hash_misses_cache(self): body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}]) stub = _StubService(response_bytes=body) with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub): await get_preview_filaments( kind="archive", source_id=1, plate_id=1, file_bytes=b"v1", file_name="x.3mf", api_url="http://sidecar", ) await get_preview_filaments( kind="archive", source_id=1, plate_id=1, file_bytes=b"v2", # Same archive, but content changed file_name="x.3mf", api_url="http://sidecar", ) # Hash differs → cache miss → fresh slice. assert len(stub.calls) == 2 @pytest.mark.asyncio async def test_sidecar_unavailable_returns_none_no_cache(self): # Transient sidecar failure must NOT poison the cache — the next # request retries cleanly. stub = _StubService(raise_exc=SlicerApiUnavailableError("boom")) with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub): first = await get_preview_filaments( kind="archive", source_id=1, plate_id=1, file_bytes=b"abc", file_name="x.3mf", api_url="http://sidecar", ) assert first is None # Second call hits the sidecar again (no cached failure). await get_preview_filaments( kind="archive", source_id=1, plate_id=1, file_bytes=b"abc", file_name="x.3mf", api_url="http://sidecar", ) assert len(stub.calls) == 2 @pytest.mark.asyncio async def test_concurrent_calls_share_one_slice(self): body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}]) # Slow stub so we can observe N coroutines piling up on the lock. class _SlowStub(_StubService): async def slice_without_profiles(self, **kw): self.calls.append(kw) await asyncio.sleep(0.05) return SliceResult( content=self.response_bytes or b"", print_time_seconds=0, filament_used_g=0.0, filament_used_mm=0.0, ) stub = _SlowStub(response_bytes=body) with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub): results = await asyncio.gather( *( get_preview_filaments( kind="archive", source_id=1, plate_id=1, file_bytes=b"abc", file_name="x.3mf", api_url="http://sidecar", ) for _ in range(8) ), ) # All 8 callers got the same result, but only ONE slice ran. assert all(r == results[0] for r in results) assert len(stub.calls) == 1 @pytest.mark.asyncio async def test_lru_eviction_drops_lock(self): # Fill cache past the bound; oldest should evict, including its lock. body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}]) stub = _StubService(response_bytes=body) with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub): # Each call has a unique source_id → unique cache key. for i in range(_PREVIEW_CACHE_MAX + 5): await get_preview_filaments( kind="archive", source_id=i, plate_id=1, file_bytes=b"abc", file_name="x.3mf", api_url="http://sidecar", ) # Cache is bounded — older entries fell off. assert len(slice_preview._preview_cache) == _PREVIEW_CACHE_MAX # Lock dict is also pruned (no leak): same size as cache. assert len(slice_preview._preview_locks) == _PREVIEW_CACHE_MAX