test_slice_preview.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """Unit tests for the preview-slice cache.
  2. The preview-slice runs the sidecar's `slice_without_profiles` on an unsliced
  3. project file to extract the per-plate filament list. Results are cached by
  4. ``(kind, source_id, plate_id, content_hash)`` with LRU eviction so repeat
  5. modal opens on the same plate are instant.
  6. """
  7. from __future__ import annotations
  8. import asyncio
  9. import io
  10. import zipfile
  11. from typing import Any
  12. from unittest.mock import patch
  13. import pytest
  14. from backend.app.services import slice_preview
  15. from backend.app.services.slice_preview import (
  16. _PREVIEW_CACHE_MAX,
  17. _parse_filaments_from_sliced_3mf,
  18. get_preview_filaments,
  19. )
  20. from backend.app.services.slicer_api import (
  21. SlicerApiUnavailableError,
  22. SliceResult,
  23. )
  24. def _make_sliced_3mf(plate_id: int, filaments: list[dict[str, str]]) -> bytes:
  25. """Build a fake sliced-3MF zip whose Metadata/slice_info.config has one
  26. plate matching ``plate_id`` with the given filament rows."""
  27. fil_xml = "".join(
  28. f'<filament id="{f["id"]}" type="{f["type"]}" color="{f["color"]}"'
  29. f' used_g="{f.get("used_g", "0")}" used_m="{f.get("used_m", "0")}"'
  30. f' tray_info_idx="{f.get("tray_info_idx", "")}"/>'
  31. for f in filaments
  32. )
  33. slice_info = (
  34. f'<?xml version="1.0"?><config><plate><metadata key="index" value="{plate_id}"/>{fil_xml}</plate></config>'
  35. )
  36. buf = io.BytesIO()
  37. with zipfile.ZipFile(buf, "w") as zf:
  38. zf.writestr("Metadata/slice_info.config", slice_info)
  39. return buf.getvalue()
  40. @pytest.fixture(autouse=True)
  41. def _reset_cache():
  42. """Each test gets an empty cache + lock dict to keep them independent."""
  43. slice_preview._preview_cache.clear()
  44. slice_preview._preview_locks.clear()
  45. yield
  46. slice_preview._preview_cache.clear()
  47. slice_preview._preview_locks.clear()
  48. class _StubService:
  49. """Mimics SlicerApiService just enough for these tests. Records every
  50. `slice_without_profiles` call so we can assert call counts."""
  51. def __init__(self, response_bytes: bytes | None = None, raise_exc: BaseException | None = None) -> None:
  52. self.response_bytes = response_bytes
  53. self.raise_exc = raise_exc
  54. self.calls: list[dict[str, Any]] = []
  55. async def __aenter__(self):
  56. return self
  57. async def __aexit__(self, *exc):
  58. return False
  59. async def slice_without_profiles(self, **kw):
  60. self.calls.append(kw)
  61. if self.raise_exc is not None:
  62. raise self.raise_exc
  63. return SliceResult(
  64. content=self.response_bytes or b"",
  65. print_time_seconds=0,
  66. filament_used_g=0.0,
  67. filament_used_mm=0.0,
  68. )
  69. # ---------------------------------------------------------------------------
  70. # _parse_filaments_from_sliced_3mf — pure-function parsing tests.
  71. # ---------------------------------------------------------------------------
  72. class TestParseFilamentsFromSliced3mf:
  73. def test_happy_path(self):
  74. body = _make_sliced_3mf(
  75. plate_id=22,
  76. filaments=[
  77. {"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "33.9"},
  78. {"id": "6", "type": "PLA", "color": "#FF0000", "used_g": "37.7"},
  79. ],
  80. )
  81. result = _parse_filaments_from_sliced_3mf(body, 22)
  82. assert result is not None
  83. assert [(f["slot_id"], f["color"]) for f in result] == [(1, "#FFFFFF"), (6, "#FF0000")]
  84. assert result[0]["used_grams"] == 33.9
  85. def test_missing_slice_info_returns_none(self):
  86. empty_zip = io.BytesIO()
  87. with zipfile.ZipFile(empty_zip, "w") as zf:
  88. zf.writestr("placeholder.txt", "x")
  89. assert _parse_filaments_from_sliced_3mf(empty_zip.getvalue(), 1) is None
  90. def test_plate_not_in_slice_info_returns_none(self):
  91. body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
  92. assert _parse_filaments_from_sliced_3mf(body, plate_id=99) is None
  93. def test_corrupt_zip_returns_none(self):
  94. assert _parse_filaments_from_sliced_3mf(b"not a zip file", 1) is None
  95. # ---------------------------------------------------------------------------
  96. # get_preview_filaments — cache + concurrency behaviour.
  97. # ---------------------------------------------------------------------------
  98. class TestGetPreviewFilaments:
  99. @pytest.mark.asyncio
  100. async def test_happy_path_caches_result(self):
  101. body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
  102. stub = _StubService(response_bytes=body)
  103. with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
  104. first = await get_preview_filaments(
  105. kind="archive",
  106. source_id=1,
  107. plate_id=1,
  108. file_bytes=b"abc",
  109. file_name="x.3mf",
  110. api_url="http://sidecar",
  111. )
  112. second = await get_preview_filaments(
  113. kind="archive",
  114. source_id=1,
  115. plate_id=1,
  116. file_bytes=b"abc",
  117. file_name="x.3mf",
  118. api_url="http://sidecar",
  119. )
  120. assert first is not None
  121. assert first[0]["slot_id"] == 1
  122. assert second == first
  123. # Cache hit — only one slice was actually run.
  124. assert len(stub.calls) == 1
  125. @pytest.mark.asyncio
  126. async def test_different_content_hash_misses_cache(self):
  127. body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
  128. stub = _StubService(response_bytes=body)
  129. with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
  130. await get_preview_filaments(
  131. kind="archive",
  132. source_id=1,
  133. plate_id=1,
  134. file_bytes=b"v1",
  135. file_name="x.3mf",
  136. api_url="http://sidecar",
  137. )
  138. await get_preview_filaments(
  139. kind="archive",
  140. source_id=1,
  141. plate_id=1,
  142. file_bytes=b"v2", # Same archive, but content changed
  143. file_name="x.3mf",
  144. api_url="http://sidecar",
  145. )
  146. # Hash differs → cache miss → fresh slice.
  147. assert len(stub.calls) == 2
  148. @pytest.mark.asyncio
  149. async def test_sidecar_unavailable_returns_none_no_cache(self):
  150. # Transient sidecar failure must NOT poison the cache — the next
  151. # request retries cleanly.
  152. stub = _StubService(raise_exc=SlicerApiUnavailableError("boom"))
  153. with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
  154. first = await get_preview_filaments(
  155. kind="archive",
  156. source_id=1,
  157. plate_id=1,
  158. file_bytes=b"abc",
  159. file_name="x.3mf",
  160. api_url="http://sidecar",
  161. )
  162. assert first is None
  163. # Second call hits the sidecar again (no cached failure).
  164. await get_preview_filaments(
  165. kind="archive",
  166. source_id=1,
  167. plate_id=1,
  168. file_bytes=b"abc",
  169. file_name="x.3mf",
  170. api_url="http://sidecar",
  171. )
  172. assert len(stub.calls) == 2
  173. @pytest.mark.asyncio
  174. async def test_concurrent_calls_share_one_slice(self):
  175. body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
  176. # Slow stub so we can observe N coroutines piling up on the lock.
  177. class _SlowStub(_StubService):
  178. async def slice_without_profiles(self, **kw):
  179. self.calls.append(kw)
  180. await asyncio.sleep(0.05)
  181. return SliceResult(
  182. content=self.response_bytes or b"",
  183. print_time_seconds=0,
  184. filament_used_g=0.0,
  185. filament_used_mm=0.0,
  186. )
  187. stub = _SlowStub(response_bytes=body)
  188. with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
  189. results = await asyncio.gather(
  190. *(
  191. get_preview_filaments(
  192. kind="archive",
  193. source_id=1,
  194. plate_id=1,
  195. file_bytes=b"abc",
  196. file_name="x.3mf",
  197. api_url="http://sidecar",
  198. )
  199. for _ in range(8)
  200. ),
  201. )
  202. # All 8 callers got the same result, but only ONE slice ran.
  203. assert all(r == results[0] for r in results)
  204. assert len(stub.calls) == 1
  205. @pytest.mark.asyncio
  206. async def test_lru_eviction_drops_lock(self):
  207. # Fill cache past the bound; oldest should evict, including its lock.
  208. body = _make_sliced_3mf(plate_id=1, filaments=[{"id": "1", "type": "PLA", "color": "#000"}])
  209. stub = _StubService(response_bytes=body)
  210. with patch.object(slice_preview, "SlicerApiService", lambda **kw: stub):
  211. # Each call has a unique source_id → unique cache key.
  212. for i in range(_PREVIEW_CACHE_MAX + 5):
  213. await get_preview_filaments(
  214. kind="archive",
  215. source_id=i,
  216. plate_id=1,
  217. file_bytes=b"abc",
  218. file_name="x.3mf",
  219. api_url="http://sidecar",
  220. )
  221. # Cache is bounded — older entries fell off.
  222. assert len(slice_preview._preview_cache) == _PREVIEW_CACHE_MAX
  223. # Lock dict is also pruned (no leak): same size as cache.
  224. assert len(slice_preview._preview_locks) == _PREVIEW_CACHE_MAX