"""Unit tests for the preview-slice cache.
The preview-slice runs the sidecar's `slice_without_profiles` on an unsliced
project file to extract the per-plate filament list. Results are cached by
``(kind, source_id, plate_id, content_hash)`` with LRU eviction so repeat
modal opens on the same plate are instant.
"""
from __future__ import annotations
import asyncio
import io
import zipfile
from typing import Any
from unittest.mock import patch
import pytest
from backend.app.services import slice_preview
from backend.app.services.slice_preview import (
_PREVIEW_CACHE_MAX,
_parse_filaments_from_sliced_3mf,
get_preview_filaments,
)
from backend.app.services.slicer_api import (
SlicerApiUnavailableError,
SliceResult,
)
def _make_sliced_3mf(plate_id: int, filaments: list[dict[str, str]]) -> bytes:
"""Build a fake sliced-3MF zip whose Metadata/slice_info.config has one
plate matching ``plate_id`` with the given filament rows."""
fil_xml = "".join(
f''
for f in filaments
)
slice_info = (
f'{fil_xml}'
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("Metadata/slice_info.config", slice_info)
return buf.getvalue()
@pytest.fixture(autouse=True)
def _reset_cache():
"""Each test gets an empty cache + lock dict to keep them independent."""
slice_preview._preview_cache.clear()
slice_preview._preview_locks.clear()
yield
slice_preview._preview_cache.clear()
slice_preview._preview_locks.clear()
class _StubService:
"""Mimics SlicerApiService just enough for these tests. Records every
`slice_without_profiles` call so we can assert call counts."""
def __init__(self, response_bytes: bytes | None = None, raise_exc: BaseException | None = None) -> None:
self.response_bytes = response_bytes
self.raise_exc = raise_exc
self.calls: list[dict[str, Any]] = []
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
return False
async def slice_without_profiles(self, **kw):
self.calls.append({"method": "slice_without_profiles", **kw})
if self.raise_exc is not None:
raise self.raise_exc
return SliceResult(
content=self.response_bytes or b"",
print_time_seconds=0,
filament_used_g=0.0,
filament_used_mm=0.0,
)
async def slice_with_bundle(self, **kw):
self.calls.append({"method": "slice_with_bundle", **kw})
if self.raise_exc is not None:
raise self.raise_exc
return SliceResult(
content=self.response_bytes or b"",
print_time_seconds=0,
filament_used_g=0.0,
filament_used_mm=0.0,
)
# ---------------------------------------------------------------------------
# _parse_filaments_from_sliced_3mf — pure-function parsing tests.
# ---------------------------------------------------------------------------
class TestParseFilamentsFromSliced3mf:
def test_happy_path(self):
body = _make_sliced_3mf(
plate_id=22,
filaments=[
{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "33.9"},
{"id": "6", "type": "PLA", "color": "#FF0000", "used_g": "37.7"},
],
)
result = _parse_filaments_from_sliced_3mf(body, 22)
assert result is not None
assert [(f["slot_id"], f["color"]) for f in result] == [(1, "#FFFFFF"), (6, "#FF0000")]
assert result[0]["used_grams"] == 33.9
def test_missing_slice_info_returns_none(self):
empty_zip = io.BytesIO()
with zipfile.ZipFile(empty_zip, "w") as zf:
zf.writestr("placeholder.txt", "x")
assert _parse_filaments_from_sliced_3mf(empty_zip.getvalue(), 1) is None
def test_plate_not_in_slice_info_returns_none(self):
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
assert _parse_filaments_from_sliced_3mf(body, plate_id=99) is None
def test_corrupt_zip_returns_none(self):
assert _parse_filaments_from_sliced_3mf(b"not a zip file", 1) is None
# ---------------------------------------------------------------------------
# get_preview_filaments — cache + concurrency behaviour.
# ---------------------------------------------------------------------------
class TestGetPreviewFilaments:
@pytest.mark.asyncio
async def test_happy_path_caches_result(self):
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
first = await get_preview_filaments(
kind="archive",
source_id=1,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
)
second = await get_preview_filaments(
kind="archive",
source_id=1,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
)
assert first is not None
assert first[0]["slot_id"] == 1
assert second == first
# Cache hit — only one slice was actually run.
assert len(stub.calls) == 1
@pytest.mark.asyncio
async def test_different_content_hash_misses_cache(self):
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
await get_preview_filaments(
kind="archive",
source_id=1,
plate_id=1,
file_bytes=b"v1",
file_name="x.3mf",
api_url="http://sidecar",
)
await get_preview_filaments(
kind="archive",
source_id=1,
plate_id=1,
file_bytes=b"v2", # Same archive, but content changed
file_name="x.3mf",
api_url="http://sidecar",
)
# Hash differs → cache miss → fresh slice.
assert len(stub.calls) == 2
@pytest.mark.asyncio
async def test_sidecar_unavailable_returns_none_no_cache(self):
# Transient sidecar failure must NOT poison the cache — the next
# request retries cleanly.
stub = _StubService(raise_exc=SlicerApiUnavailableError("boom"))
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
first = await get_preview_filaments(
kind="archive",
source_id=1,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
)
assert first is None
# Second call hits the sidecar again (no cached failure).
await get_preview_filaments(
kind="archive",
source_id=1,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
)
assert len(stub.calls) == 2
@pytest.mark.asyncio
async def test_concurrent_calls_share_one_slice(self):
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
# Slow stub so we can observe N coroutines piling up on the lock.
class _SlowStub(_StubService):
async def slice_without_profiles(self, **kw):
self.calls.append(kw)
await asyncio.sleep(0.05)
return SliceResult(
content=self.response_bytes or b"",
print_time_seconds=0,
filament_used_g=0.0,
filament_used_mm=0.0,
)
stub = _SlowStub(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
results = await asyncio.gather(
*(
get_preview_filaments(
kind="archive",
source_id=1,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
)
for _ in range(8)
),
)
# All 8 callers got the same result, but only ONE slice ran.
assert all(r == results[0] for r in results)
assert len(stub.calls) == 1
@pytest.mark.asyncio
async def test_lru_eviction_drops_lock(self):
# Fill cache past the bound; oldest should evict, including its lock.
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
# Each call has a unique source_id → unique cache key.
for i in range(_PREVIEW_CACHE_MAX + 5):
await get_preview_filaments(
kind="archive",
source_id=i,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
)
# Cache is bounded — older entries fell off.
assert len(slice_preview._preview_cache) == _PREVIEW_CACHE_MAX
# Lock dict is also pruned (no leak): same size as cache.
assert len(slice_preview._preview_locks) == _PREVIEW_CACHE_MAX
# ---------------------------------------------------------------------------
# Bundle-aware preview path — when bundle context is supplied, the preview
# routes through `slice_with_bundle` so its gram numbers reflect the same
# triplet the real print will use. Cache must distinguish between bundle
# picks so a fresh selection doesn't re-serve a prior preview's output.
# ---------------------------------------------------------------------------
class TestBundleAwarePreview:
@pytest.mark.asyncio
async def test_full_bundle_context_uses_slice_with_bundle(self):
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
result = await get_preview_filaments(
kind="library_file",
source_id=42,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
bundle_id="abc123",
printer_name="# Bambu Lab H2D 0.4 nozzle",
process_name="# 0.20mm Standard @BBL H2D",
filament_names=["# Bambu PLA Basic @BBL H2D"],
)
assert result is not None
assert result[0]["slot_id"] == 1
# The bundle path engaged — slice_with_bundle was called, not the
# embedded-settings fallback.
assert len(stub.calls) == 1
assert stub.calls[0]["method"] == "slice_with_bundle"
assert stub.calls[0]["bundle_id"] == "abc123"
assert stub.calls[0]["filament_names"] == ["# Bambu PLA Basic @BBL H2D"]
@pytest.mark.asyncio
async def test_partial_bundle_context_falls_back_to_embedded(self):
# Modal-in-progress case: user picked a bundle id but hasn't yet
# picked the filament. Falling back to embedded settings keeps
# the preview's slot mapping fresh while gram numbers will firm
# up once the selection completes.
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
await get_preview_filaments(
kind="library_file",
source_id=42,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
bundle_id="abc123",
printer_name="# Bambu Lab H2D 0.4 nozzle",
process_name="# 0.20mm Standard @BBL H2D",
# filament_names missing
)
assert len(stub.calls) == 1
assert stub.calls[0]["method"] == "slice_without_profiles"
@pytest.mark.asyncio
async def test_empty_filament_names_list_falls_back(self):
# Empty list (vs None) is treated as "incomplete context" since
# passing `[]` to slice_with_bundle would yield no
# --load-filaments arg and confuse the CLI.
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
await get_preview_filaments(
kind="library_file",
source_id=42,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
bundle_id="abc123",
printer_name="P",
process_name="Q",
filament_names=[],
)
assert stub.calls[0]["method"] == "slice_without_profiles"
@pytest.mark.asyncio
async def test_cache_separates_bundle_picks(self):
# Same file/plate, two different bundle picks → two distinct cache
# entries → two slices run. Without the bundle-fingerprint cache key,
# the second call would erroneously serve the first's output.
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
await get_preview_filaments(
kind="library_file",
source_id=42,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
bundle_id="bundleA",
printer_name="P",
process_name="Q",
filament_names=["F"],
)
await get_preview_filaments(
kind="library_file",
source_id=42,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
bundle_id="bundleB",
printer_name="P",
process_name="Q",
filament_names=["F"],
)
assert len(stub.calls) == 2
assert stub.calls[0]["bundle_id"] == "bundleA"
assert stub.calls[1]["bundle_id"] == "bundleB"
@pytest.mark.asyncio
async def test_cache_separates_bundle_vs_embedded(self):
# Same file/plate, one call without bundle and one with bundle →
# both must run. The embedded-settings cache entry must NOT be
# served as the bundle-picked result (gram numbers would be wrong).
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
await get_preview_filaments(
kind="library_file",
source_id=42,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
)
await get_preview_filaments(
kind="library_file",
source_id=42,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
bundle_id="bundleA",
printer_name="P",
process_name="Q",
filament_names=["F"],
)
methods = [c["method"] for c in stub.calls]
assert methods == ["slice_without_profiles", "slice_with_bundle"]
@pytest.mark.asyncio
async def test_bundle_repeat_call_hits_cache(self):
# Sanity check that the new cache key is otherwise stable: same
# bundle pick on the same file → cache hit on second call.
body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
stub = _StubService(response_bytes=body)
with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
for _ in range(2):
await get_preview_filaments(
kind="library_file",
source_id=42,
plate_id=1,
file_bytes=b"abc",
file_name="x.3mf",
api_url="http://sidecar",
bundle_id="bundleA",
printer_name="P",
process_name="Q",
filament_names=["F"],
)
assert len(stub.calls) == 1