|
|
@@ -0,0 +1,2526 @@
|
|
|
+"""Integration tests for the Spoolman inventory proxy endpoints.
|
|
|
+
|
|
|
+These tests verify that /api/v1/spoolman/inventory/spools/* correctly
|
|
|
+translates between Spoolman's data model and Bambuddy's InventorySpool format.
|
|
|
+"""
|
|
|
+
|
|
|
+from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
+
|
|
|
+import pytest
|
|
|
+from fastapi import HTTPException
|
|
|
+from httpx import AsyncClient
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# Shared fixtures
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+SAMPLE_SPOOLMAN_SPOOL = {
|
|
|
+ "id": 42,
|
|
|
+ "filament": {
|
|
|
+ "id": 7,
|
|
|
+ "name": "PLA Basic",
|
|
|
+ "material": "PLA",
|
|
|
+ "color_hex": "FF0000",
|
|
|
+ "weight": 1000,
|
|
|
+ "vendor": {"id": 3, "name": "Bambu Lab"},
|
|
|
+ },
|
|
|
+ "remaining_weight": 750.0,
|
|
|
+ "used_weight": 250.0,
|
|
|
+ "location": "Printer1 - AMS A1",
|
|
|
+ "comment": "test note",
|
|
|
+ "first_used": "2024-01-01T00:00:00+00:00",
|
|
|
+ "last_used": "2024-02-01T00:00:00+00:00",
|
|
|
+ "registered": "2024-01-01T00:00:00+00:00",
|
|
|
+ "archived": False,
|
|
|
+ "price": None,
|
|
|
+ "extra": {"tag": '"AABBCCDDEEFF0011AABBCCDDEEFF0011"'},
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture
|
|
|
+async def spoolman_settings(db_session):
|
|
|
+ """Create Spoolman settings in the database (enabled with URL)."""
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+
|
|
|
+ enabled_setting = Settings(key="spoolman_enabled", value="true")
|
|
|
+ url_setting = Settings(key="spoolman_url", value="http://localhost:7912")
|
|
|
+ db_session.add(enabled_setting)
|
|
|
+ db_session.add(url_setting)
|
|
|
+ await db_session.commit()
|
|
|
+ return {"enabled": enabled_setting, "url": url_setting}
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture
|
|
|
+def mock_spoolman_client():
|
|
|
+ """Mock the Spoolman client with a sample spool."""
|
|
|
+ mock_client = MagicMock()
|
|
|
+ mock_client.base_url = "http://localhost:7912"
|
|
|
+ mock_client.health_check = AsyncMock(return_value=True)
|
|
|
+ mock_client.get_all_spools = AsyncMock(return_value=[SAMPLE_SPOOLMAN_SPOOL])
|
|
|
+ mock_client.get_spool = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+ mock_client.create_spool = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+ mock_client.delete_spool = AsyncMock(return_value=True)
|
|
|
+ mock_client.set_spool_archived = AsyncMock(
|
|
|
+ side_effect=lambda spool_id, archived: {**SAMPLE_SPOOLMAN_SPOOL, "archived": archived}
|
|
|
+ )
|
|
|
+ mock_client.update_spool_full = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+ mock_client.merge_spool_extra = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+ mock_client.find_or_create_filament = AsyncMock(return_value=7)
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory.get_spoolman_client",
|
|
|
+ AsyncMock(return_value=mock_client),
|
|
|
+ ),
|
|
|
+ patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory.init_spoolman_client",
|
|
|
+ AsyncMock(return_value=mock_client),
|
|
|
+ ),
|
|
|
+ ):
|
|
|
+ yield mock_client
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventoryMapping:
|
|
|
+ """Tests for the Spoolman → InventorySpool data mapping."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_list_spools_returns_inventory_format(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """GET /spoolman/inventory/spools returns spools in InventorySpool format."""
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ spools = response.json()
|
|
|
+ assert isinstance(spools, list)
|
|
|
+ assert len(spools) == 1
|
|
|
+
|
|
|
+ spool = spools[0]
|
|
|
+ assert spool["id"] == 42
|
|
|
+ assert spool["material"] == "PLA"
|
|
|
+ assert spool["subtype"] == "Basic"
|
|
|
+ assert spool["brand"] == "Bambu Lab"
|
|
|
+ assert spool["label_weight"] == 1000
|
|
|
+ assert spool["weight_used"] == 250.0
|
|
|
+ assert spool["note"] == "test note"
|
|
|
+ assert spool["data_origin"] == "spoolman"
|
|
|
+ assert spool["tag_type"] == "spoolman"
|
|
|
+ # RRGGBB + FF alpha
|
|
|
+ assert spool["rgba"] == "FF0000FF"
|
|
|
+ # Spoolman location mapped to storage_location
|
|
|
+ assert spool["storage_location"] == "Printer1 - AMS A1"
|
|
|
+ # RFID tag: 32-char → tray_uuid
|
|
|
+ assert spool["tray_uuid"] == "AABBCCDDEEFF0011AABBCCDDEEFF0011"
|
|
|
+ assert spool["tag_uid"] is None
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_get_single_spool(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """GET /spoolman/inventory/spools/{id} returns a single spool."""
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools/42")
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ spool = response.json()
|
|
|
+ assert spool["id"] == 42
|
|
|
+ assert spool["material"] == "PLA"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_list_includes_archived_when_requested(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """GET /spoolman/inventory/spools?include_archived=true calls Spoolman with allow_archived."""
|
|
|
+ await async_client.get("/api/v1/spoolman/inventory/spools?include_archived=true")
|
|
|
+ mock_spoolman_client.get_all_spools.assert_called_once_with(allow_archived=True)
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_archived_spool_has_archived_at(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """An archived Spoolman spool maps to archived_at != None."""
|
|
|
+ archived_spool = {
|
|
|
+ **SAMPLE_SPOOLMAN_SPOOL,
|
|
|
+ "archived": True,
|
|
|
+ }
|
|
|
+ mock_spoolman_client.get_all_spools.return_value = [archived_spool]
|
|
|
+
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools?include_archived=true")
|
|
|
+ spool = response.json()[0]
|
|
|
+ assert spool["archived_at"] is not None
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_malformed_spool_skipped_in_list(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """A spool with an invalid id (e.g. 0) is silently skipped; others still appear."""
|
|
|
+ bad_spool = {**SAMPLE_SPOOLMAN_SPOOL, "id": 0}
|
|
|
+ mock_spoolman_client.get_all_spools.return_value = [bad_spool, SAMPLE_SPOOLMAN_SPOOL]
|
|
|
+
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ assert response.status_code == 200
|
|
|
+ spools = response.json()
|
|
|
+ # bad_spool is dropped; the valid one survives
|
|
|
+ assert len(spools) == 1
|
|
|
+ assert spools[0]["id"] == 42
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_list_spools_returns_503_when_spoolman_unavailable(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """GET /spoolman/inventory/spools returns 503 when Spoolman is unreachable (H10)."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanUnavailableError
|
|
|
+
|
|
|
+ mock_spoolman_client.get_all_spools.side_effect = SpoolmanUnavailableError("down")
|
|
|
+
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ assert response.status_code == 503
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_uid_16char_maps_correctly(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """A 16-char tag maps to tag_uid, not tray_uuid."""
|
|
|
+ spool_with_short_tag = {
|
|
|
+ **SAMPLE_SPOOLMAN_SPOOL,
|
|
|
+ "extra": {"tag": '"AABBCCDDEEFF0011"'},
|
|
|
+ }
|
|
|
+ mock_spoolman_client.get_all_spools.return_value = [spool_with_short_tag]
|
|
|
+
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ spool = response.json()[0]
|
|
|
+ assert spool["tag_uid"] == "AABBCCDDEEFF0011"
|
|
|
+ assert spool["tray_uuid"] is None
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventoryCRUD:
|
|
|
+ """Tests for create, update, delete, archive, restore operations."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_not_enabled_returns_400(self, async_client: AsyncClient):
|
|
|
+ """All endpoints return 400 when Spoolman is not enabled."""
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ assert response.status_code == 400
|
|
|
+ assert "not enabled" in response.json()["detail"].lower()
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_spool(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """POST /spoolman/inventory/spools creates a spool via Spoolman."""
|
|
|
+ payload = {
|
|
|
+ "material": "PLA",
|
|
|
+ "subtype": "Basic",
|
|
|
+ "brand": "Bambu Lab",
|
|
|
+ "rgba": "FF0000FF",
|
|
|
+ "label_weight": 1000,
|
|
|
+ "weight_used": 0,
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.find_or_create_filament.assert_called_once()
|
|
|
+ mock_spoolman_client.create_spool.assert_called_once()
|
|
|
+ data = response.json()
|
|
|
+ assert data["material"] == "PLA"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_bulk_create_spools(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """POST /spoolman/inventory/spools/bulk creates multiple spools."""
|
|
|
+ payload = {
|
|
|
+ "spool": {"material": "PETG", "label_weight": 1000, "weight_used": 0},
|
|
|
+ "quantity": 3,
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools/bulk", json=payload)
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ assert mock_spoolman_client.create_spool.call_count == 3
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_bulk_create_quantity_out_of_range_returns_422(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Bulk create quantity outside 1-50 is rejected with 422 (not silently clamped)."""
|
|
|
+ payload = {
|
|
|
+ "spool": {"material": "ABS", "label_weight": 1000, "weight_used": 0},
|
|
|
+ "quantity": 999,
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools/bulk", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_bulk_create_quantity_zero_returns_422(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Bulk create quantity of 0 is rejected with 422."""
|
|
|
+ payload = {
|
|
|
+ "spool": {"material": "ABS", "label_weight": 1000, "weight_used": 0},
|
|
|
+ "quantity": 0,
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools/bulk", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_spool(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH /spoolman/inventory/spools/{id} updates a spool."""
|
|
|
+ payload = {"note": "updated note", "weight_used": 100.0}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.update_spool_full.assert_called_once()
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_spool_not_found(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH returns 404 when Spoolman spool does not exist."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanNotFoundError
|
|
|
+
|
|
|
+ mock_spoolman_client.get_spool.side_effect = SpoolmanNotFoundError("spool not found")
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/999", json={"note": "x"})
|
|
|
+ assert response.status_code == 404
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_delete_spool(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """DELETE /spoolman/inventory/spools/{id} deletes a spool."""
|
|
|
+ response = await async_client.delete("/api/v1/spoolman/inventory/spools/42")
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ assert response.json()["status"] == "deleted"
|
|
|
+ mock_spoolman_client.delete_spool.assert_called_once_with(42)
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_delete_spool_failure(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """DELETE returns 503 when Spoolman is unreachable."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanUnavailableError
|
|
|
+
|
|
|
+ mock_spoolman_client.delete_spool.side_effect = SpoolmanUnavailableError("unreachable")
|
|
|
+ response = await async_client.delete("/api/v1/spoolman/inventory/spools/42")
|
|
|
+ assert response.status_code == 503
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_delete_spool_not_found(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """DELETE returns 404 when Spoolman reports the spool does not exist."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanNotFoundError
|
|
|
+
|
|
|
+ mock_spoolman_client.delete_spool.side_effect = SpoolmanNotFoundError("gone")
|
|
|
+ response = await async_client.delete("/api/v1/spoolman/inventory/spools/42")
|
|
|
+ assert response.status_code == 404
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_archive_spool_not_found(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """POST /archive returns 404 when Spoolman reports the spool does not exist."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanNotFoundError
|
|
|
+
|
|
|
+ mock_spoolman_client.set_spool_archived.side_effect = SpoolmanNotFoundError("gone")
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools/42/archive")
|
|
|
+ assert response.status_code == 404
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_restore_spool_not_found(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """POST /restore returns 404 when Spoolman reports the spool does not exist."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanNotFoundError
|
|
|
+
|
|
|
+ mock_spoolman_client.set_spool_archived.side_effect = SpoolmanNotFoundError("gone")
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools/42/restore")
|
|
|
+ assert response.status_code == 404
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_archive_spool(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """POST /spoolman/inventory/spools/{id}/archive archives a spool."""
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools/42/archive")
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.set_spool_archived.assert_called_once_with(42, archived=True)
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_restore_spool(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """POST /spoolman/inventory/spools/{id}/restore restores an archived spool."""
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools/42/restore")
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.set_spool_archived.assert_called_once_with(42, archived=False)
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_sync_weight(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH /spoolman/inventory/spools/{id}/weight updates remaining weight."""
|
|
|
+ payload = {"weight_grams": 850.0}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42/weight", json=payload)
|
|
|
+
|
|
|
+ assert response.status_code == 200
|
|
|
+ result = response.json()
|
|
|
+ assert result["status"] == "ok"
|
|
|
+ # remaining = 850 - 250 core = 600; weight_used = 1000 - 600 = 400
|
|
|
+ assert result["weight_used"] == 400.0
|
|
|
+ mock_spoolman_client.update_spool_full.assert_called_once_with(spool_id=42, remaining_weight=600.0)
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_spool_returns_404_on_not_found(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH returns 404 when update_spool_full raises SpoolmanNotFoundError (I2)."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanNotFoundError
|
|
|
+
|
|
|
+ mock_spoolman_client.update_spool_full.side_effect = SpoolmanNotFoundError("gone")
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json={"note": "x"})
|
|
|
+ assert response.status_code == 404
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_spool_returns_503_on_unavailable(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH returns 503 when update_spool_full raises SpoolmanUnavailableError (I2)."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanUnavailableError
|
|
|
+
|
|
|
+ mock_spoolman_client.update_spool_full.side_effect = SpoolmanUnavailableError("down")
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json={"note": "x"})
|
|
|
+ assert response.status_code == 503
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_sync_weight_returns_404_on_not_found(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH /weight returns 404 when update_spool_full raises SpoolmanNotFoundError (I2)."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanNotFoundError
|
|
|
+
|
|
|
+ mock_spoolman_client.update_spool_full.side_effect = SpoolmanNotFoundError("gone")
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42/weight", json={"weight_grams": 500.0})
|
|
|
+ assert response.status_code == 404
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_sync_weight_returns_503_on_unavailable(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH /weight returns 503 when update_spool_full raises SpoolmanUnavailableError (I2)."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanUnavailableError
|
|
|
+
|
|
|
+ mock_spoolman_client.update_spool_full.side_effect = SpoolmanUnavailableError("down")
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42/weight", json={"weight_grams": 500.0})
|
|
|
+ assert response.status_code == 503
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventorySlicerFilament:
|
|
|
+ """slicer_filament persistence via Spoolman extra dict.
|
|
|
+
|
|
|
+ Spoolman has no native slicer_filament field — Bambuddy persists the
|
|
|
+ BambuStudio preset under bambu_slicer_filament[_name] keys in the
|
|
|
+ spool's extra dict and unwraps them in _map_spoolman_spool. Without
|
|
|
+ this round-trip the user's slicer-preset selection on the spool form
|
|
|
+ is silently dropped (#1114).
|
|
|
+ """
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_persists_slicer_filament_to_extra(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH with slicer_filament writes bambu_slicer_filament to extra.
|
|
|
+
|
|
|
+ Spoolman's PATCH MERGES extra keys, so we send via merge_spool_extra
|
|
|
+ not update_spool_full. Values are JSON-encoded strings.
|
|
|
+ """
|
|
|
+ import json as _json
|
|
|
+
|
|
|
+ mock_spoolman_client.ensure_extra_field = AsyncMock(return_value=True)
|
|
|
+
|
|
|
+ response = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={
|
|
|
+ "slicer_filament": "PFUSf543b298f8ea66",
|
|
|
+ "slicer_filament_name": "Devil Design PLA Basic @Bambu Lab H2D 0.4 nozzle (Custom)",
|
|
|
+ },
|
|
|
+ )
|
|
|
+ assert response.status_code == 200
|
|
|
+ # Field registration is idempotent — must be called for each key
|
|
|
+ ensure_calls = [c.args[0] for c in mock_spoolman_client.ensure_extra_field.call_args_list]
|
|
|
+ assert "bambu_slicer_filament" in ensure_calls
|
|
|
+ assert "bambu_slicer_filament_name" in ensure_calls
|
|
|
+ # Values must be JSON-encoded so read-side can json.loads + .strip('"')
|
|
|
+ mock_spoolman_client.merge_spool_extra.assert_called_once_with(
|
|
|
+ 42,
|
|
|
+ {
|
|
|
+ "bambu_slicer_filament": _json.dumps("PFUSf543b298f8ea66"),
|
|
|
+ "bambu_slicer_filament_name": _json.dumps("Devil Design PLA Basic @Bambu Lab H2D 0.4 nozzle (Custom)"),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_without_slicer_filament_skips_merge(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH without slicer_filament fields must not call merge_spool_extra.
|
|
|
+
|
|
|
+ Avoids overwriting an existing preset with empty/null when the user
|
|
|
+ just changed an unrelated field (e.g. note, weight).
|
|
|
+ """
|
|
|
+ response = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={"note": "just changing the note"},
|
|
|
+ )
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.merge_spool_extra.assert_not_called()
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_clears_slicer_filament_with_empty_string(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Empty-string slicer_filament writes the JSON-encoded "" sentinel.
|
|
|
+
|
|
|
+ The read-side strip('"') resolves it to an empty string and falls
|
|
|
+ back to filament.name — matches the user-facing "clear preset" flow.
|
|
|
+ """
|
|
|
+ import json as _json
|
|
|
+
|
|
|
+ mock_spoolman_client.ensure_extra_field = AsyncMock(return_value=True)
|
|
|
+
|
|
|
+ response = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={"slicer_filament": "", "slicer_filament_name": ""},
|
|
|
+ )
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.merge_spool_extra.assert_called_once_with(
|
|
|
+ 42,
|
|
|
+ {
|
|
|
+ "bambu_slicer_filament": _json.dumps(""),
|
|
|
+ "bambu_slicer_filament_name": _json.dumps(""),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventoryCostPerKg:
|
|
|
+ """Tests for the two-step cost_per_kg create path (PT-C2)."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_spool_with_cost_per_kg_calls_price_update(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """POST with cost_per_kg calls update_spool_full with price= after creation."""
|
|
|
+ from unittest.mock import AsyncMock
|
|
|
+
|
|
|
+ mock_spoolman_client.update_spool_full = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "material": "PLA",
|
|
|
+ "brand": "Bambu Lab",
|
|
|
+ "label_weight": 1000,
|
|
|
+ "cost_per_kg": 24.99,
|
|
|
+ }
|
|
|
+ resp = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert resp.status_code == 200
|
|
|
+ # update_spool_full must have been called with price=24.99
|
|
|
+ calls = [
|
|
|
+ c
|
|
|
+ for c in mock_spoolman_client.update_spool_full.call_args_list
|
|
|
+ if c.kwargs.get("price") == 24.99 or (c.args and 24.99 in c.args)
|
|
|
+ ]
|
|
|
+ assert len(calls) >= 1
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_spool_without_cost_per_kg_skips_price_update(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """POST without cost_per_kg does not call update_spool_full."""
|
|
|
+ from unittest.mock import AsyncMock
|
|
|
+
|
|
|
+ mock_spoolman_client.update_spool_full = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+
|
|
|
+ payload = {"material": "PLA", "brand": "Bambu Lab", "label_weight": 1000}
|
|
|
+ resp = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert resp.status_code == 200
|
|
|
+ mock_spoolman_client.update_spool_full.assert_not_called()
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventoryInputValidation:
|
|
|
+ """Tests for input validation added as security hardening."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_rejects_material_too_long(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """material longer than 64 chars is rejected with 422."""
|
|
|
+ payload = {"material": "A" * 65, "label_weight": 1000, "weight_used": 0}
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_rejects_note_too_long(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """note longer than 1000 chars is rejected with 422."""
|
|
|
+ payload = {
|
|
|
+ "material": "PLA",
|
|
|
+ "label_weight": 1000,
|
|
|
+ "weight_used": 0,
|
|
|
+ "note": "x" * 1001,
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_rejects_negative_weight_used(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Negative weight_used is rejected with 422."""
|
|
|
+ payload = {"material": "PLA", "label_weight": 1000, "weight_used": -1.0}
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_rejects_zero_label_weight(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """label_weight of 0 is rejected (minimum is 1)."""
|
|
|
+ payload = {"material": "PLA", "label_weight": 0, "weight_used": 0}
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_rejects_invalid_rgba(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Non-hex rgba string is rejected with 422."""
|
|
|
+ payload = {"material": "PLA", "label_weight": 1000, "weight_used": 0, "rgba": "GGGGGGFF"}
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_accepts_valid_6char_rgba(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """A valid 6-char hex rgba is accepted."""
|
|
|
+ payload = {"material": "PLA", "label_weight": 1000, "weight_used": 0, "rgba": "FF0000"}
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_weight_update_rejects_negative_grams(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Negative weight_grams on weight sync endpoint is rejected with 422."""
|
|
|
+ response = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/weight",
|
|
|
+ json={"weight_grams": -50.0},
|
|
|
+ )
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_rejects_tag_uid_too_long(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """tag_uid longer than 30 chars is rejected with 422 (NFC UID max 10 bytes = 20 hex chars, capped at 30)."""
|
|
|
+ payload = {"tag_uid": "A" * 65}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_rejects_tray_uuid_too_long(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """tray_uuid longer than 32 chars is rejected with 422."""
|
|
|
+ payload = {"tray_uuid": "B" * 65}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize("uuid_len", [16, 31])
|
|
|
+ async def test_update_rejects_tray_uuid_too_short(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ uuid_len: int,
|
|
|
+ ):
|
|
|
+ """tray_uuid shorter than 32 chars is rejected (min_length=max_length=32)."""
|
|
|
+ payload = {"tray_uuid": "A" * uuid_len}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_rejects_rgba_nine_chars(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """rgba must be max 8 hex chars; 9-char value is rejected with 422."""
|
|
|
+ payload = {"rgba": "FF0000FFA"} # 9 chars
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_uid_below_min_length_rejected(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """tag_uid shorter than 8 hex chars is rejected with 422 (PT-I5)."""
|
|
|
+ payload = {"tag_uid": "AABBCC"} # 6 chars, below min_length=8
|
|
|
+ resp = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_invalid_spoolman_url_scheme_returns_400(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ db_session,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """A spoolman_url with a non-http(s) scheme is rejected."""
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value="ftp://evil.internal/"))
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ assert response.status_code == 400
|
|
|
+ assert "http" in response.json()["detail"].lower()
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "evil_url",
|
|
|
+ [
|
|
|
+ "file:///etc/passwd",
|
|
|
+ "gopher://127.0.0.1:70/",
|
|
|
+ "dict://internal.corp/",
|
|
|
+ "javascript:alert(1)",
|
|
|
+ "http://169.254.169.254/latest/meta-data/", # AWS IMDS
|
|
|
+ "http://100.100.100.200/", # Alibaba Cloud metadata
|
|
|
+ "http://[fd00:ec2::254]/", # AWS IMDS IPv6
|
|
|
+ "http://0.0.0.0/", # unspecified
|
|
|
+ "http://224.0.0.1/", # IPv4 multicast
|
|
|
+ "http://[ff02::1]/", # IPv6 multicast
|
|
|
+ "http://[::ffff:169.254.169.254]/", # IPv4-mapped IPv6 IMDS bypass
|
|
|
+ "http://2130706433/", # decimal-encoded 127.0.0.1
|
|
|
+ "http://0x7f000001/", # hex-encoded 127.0.0.1
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_ssrf_blocked_schemes_and_addresses(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ db_session,
|
|
|
+ mock_spoolman_client,
|
|
|
+ evil_url: str,
|
|
|
+ ):
|
|
|
+ """SSRF: dangerous schemes, cloud metadata IPs, multicast, unspecified,
|
|
|
+ and numeric-encoded IPs must be rejected with 400. Loopback and
|
|
|
+ RFC-1918 private ranges are allowed — they are legitimate Spoolman
|
|
|
+ topologies for self-hosted Bambuddy deployments."""
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value=evil_url))
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ assert response.status_code == 400, (
|
|
|
+ f"Expected 400 for SSRF URL {evil_url!r} but got {response.status_code}: {response.json()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "lan_url",
|
|
|
+ [
|
|
|
+ "http://127.0.0.1:7912/", # loopback
|
|
|
+ "http://[::1]:7912/", # IPv6 loopback
|
|
|
+ "http://192.168.1.50:7912/", # RFC-1918 /16
|
|
|
+ "http://10.0.0.5:7912/", # RFC-1918 /8
|
|
|
+ "http://172.20.0.3:7912/", # RFC-1918 /12
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_ssrf_allows_lan_spoolman_topologies(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ db_session,
|
|
|
+ mock_spoolman_client,
|
|
|
+ lan_url: str,
|
|
|
+ ):
|
|
|
+ """Regression: Bambuddy's normal deployment is LAN-local Spoolman.
|
|
|
+ Loopback and RFC-1918 private addresses must NOT be rejected as SSRF."""
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value=lan_url))
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ assert response.status_code != 400, f"LAN URL {lan_url!r} was incorrectly blocked as SSRF: {response.json()}"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_rejects_storage_location_too_long(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """storage_location longer than 255 chars is rejected with 422."""
|
|
|
+ payload = {
|
|
|
+ "material": "PLA",
|
|
|
+ "label_weight": 1000,
|
|
|
+ "weight_used": 0,
|
|
|
+ "storage_location": "x" * 256,
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_rejects_storage_location_too_long(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """storage_location longer than 255 chars on PATCH is rejected with 422."""
|
|
|
+ payload = {"storage_location": "y" * 256}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+
|
|
|
+class TestStorageLocationPassthrough:
|
|
|
+ """Tests that storage_location is correctly passed to and from Spoolman."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_list_spools_maps_spoolman_location_to_storage_location(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Spoolman's location field is exposed as storage_location in the response."""
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ spool = response.json()[0]
|
|
|
+ assert spool["storage_location"] == "Printer1 - AMS A1"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_list_spools_null_location_gives_null_storage_location(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """A Spoolman spool with no location gives null storage_location."""
|
|
|
+ spool_no_loc = {**SAMPLE_SPOOLMAN_SPOOL, "location": None}
|
|
|
+ mock_spoolman_client.get_all_spools.return_value = [spool_no_loc]
|
|
|
+ response = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ spool = response.json()[0]
|
|
|
+ assert spool["storage_location"] is None
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_passes_storage_location_to_spoolman(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """storage_location is forwarded as location when creating a Spoolman spool."""
|
|
|
+ payload = {
|
|
|
+ "material": "PLA",
|
|
|
+ "label_weight": 1000,
|
|
|
+ "weight_used": 0,
|
|
|
+ "storage_location": "Shelf B",
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.create_spool.assert_called_once()
|
|
|
+ _, kwargs = mock_spoolman_client.create_spool.call_args
|
|
|
+ assert kwargs.get("location") == "Shelf B"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_passes_storage_location_to_spoolman(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """storage_location is forwarded as location when updating a Spoolman spool."""
|
|
|
+ payload = {"storage_location": "Drawer 3"}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.update_spool_full.assert_called_once()
|
|
|
+ _, kwargs = mock_spoolman_client.update_spool_full.call_args
|
|
|
+ assert kwargs.get("location") == "Drawer 3"
|
|
|
+ assert kwargs.get("clear_location") is False
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_clears_storage_location_when_null_sent(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Explicitly sending null storage_location clears the Spoolman location."""
|
|
|
+ payload = {"storage_location": None}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+ _, kwargs = mock_spoolman_client.update_spool_full.call_args
|
|
|
+ assert kwargs.get("clear_location") is True
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_clears_storage_location_when_empty_string_sent(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """Sending an empty string for storage_location also clears the Spoolman location."""
|
|
|
+ payload = {"storage_location": ""}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+ _, kwargs = mock_spoolman_client.update_spool_full.call_args
|
|
|
+ assert kwargs.get("clear_location") is True
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_omitting_storage_location_does_not_write_location_to_spoolman(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH without storage_location in the payload must not touch Spoolman's location field.
|
|
|
+
|
|
|
+ Regression test for the round-trip bug: opening the edit modal and saving without
|
|
|
+ changing the location would previously echo the current Spoolman value back
|
|
|
+ (storage_location_changed=False branch used current.get("location") instead of None).
|
|
|
+ """
|
|
|
+ # Payload deliberately omits storage_location — simulates saving the modal
|
|
|
+ # without touching that field.
|
|
|
+ payload = {"note": "just updating the note"}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.update_spool_full.assert_called_once()
|
|
|
+ _, kwargs = mock_spoolman_client.update_spool_full.call_args
|
|
|
+ # location must be None so update_spool_full skips writing the field entirely
|
|
|
+ assert kwargs.get("location") is None
|
|
|
+ # clear_location must also be False — we are not explicitly clearing it either
|
|
|
+ assert kwargs.get("clear_location") is False
|
|
|
+
|
|
|
+
|
|
|
+class TestColorNamePassthrough:
|
|
|
+ """color_name is forwarded to find_or_create_filament on create and update (B6 / T5)."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_passes_color_name_to_filament(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """color_name from the create payload is forwarded to find_or_create_filament."""
|
|
|
+ payload = {
|
|
|
+ "material": "PLA",
|
|
|
+ "label_weight": 1000,
|
|
|
+ "weight_used": 0,
|
|
|
+ "color_name": "Bambu Green",
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.find_or_create_filament.assert_called_once()
|
|
|
+ _, kwargs = mock_spoolman_client.find_or_create_filament.call_args
|
|
|
+ assert kwargs.get("color_name") == "Bambu Green"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_passes_color_name_to_filament(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """color_name from the update payload is forwarded to find_or_create_filament."""
|
|
|
+ payload = {"color_name": "Jade White"}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+ mock_spoolman_client.find_or_create_filament.assert_called_once()
|
|
|
+ _, kwargs = mock_spoolman_client.find_or_create_filament.call_args
|
|
|
+ assert kwargs.get("color_name") == "Jade White"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_omits_color_name_when_not_provided(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """When color_name is not in the PATCH payload, the existing filament color_name is used."""
|
|
|
+ payload = {"note": "no color_name here"}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == 200
|
|
|
+ _, kwargs = mock_spoolman_client.find_or_create_filament.call_args
|
|
|
+ # color_name falls back to current filament's color_name (which is None in test fixture)
|
|
|
+ assert kwargs.get("color_name") is None
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventoryAuth:
|
|
|
+ """Write/delete endpoints require INVENTORY_UPDATE when auth is enabled."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ async def auth_and_spoolman_settings(self, db_session):
|
|
|
+ """Enable both Spoolman and auth."""
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value="http://localhost:7912"))
|
|
|
+ db_session.add(Settings(key="auth_enabled", value="true"))
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "method,path,payload",
|
|
|
+ [
|
|
|
+ ("POST", "/api/v1/spoolman/inventory/spools", {"material": "PLA", "label_weight": 1000, "weight_used": 0}),
|
|
|
+ (
|
|
|
+ "POST",
|
|
|
+ "/api/v1/spoolman/inventory/spools/bulk",
|
|
|
+ {"spool": {"material": "PLA", "label_weight": 1000, "weight_used": 0}, "quantity": 1},
|
|
|
+ ),
|
|
|
+ ("PATCH", "/api/v1/spoolman/inventory/spools/42", {"note": "x"}),
|
|
|
+ ("DELETE", "/api/v1/spoolman/inventory/spools/42", None),
|
|
|
+ ("POST", "/api/v1/spoolman/inventory/spools/42/archive", None),
|
|
|
+ ("POST", "/api/v1/spoolman/inventory/spools/42/restore", None),
|
|
|
+ ("PATCH", "/api/v1/spoolman/inventory/spools/42/weight", {"weight_grams": 100.0}),
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_write_endpoints_require_auth(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ auth_and_spoolman_settings,
|
|
|
+ method: str,
|
|
|
+ path: str,
|
|
|
+ payload: dict | None,
|
|
|
+ ):
|
|
|
+ """All write/delete endpoints return 401 when auth is enabled and no token is provided."""
|
|
|
+ response = await async_client.request(method, path, json=payload)
|
|
|
+ assert response.status_code == 401, (
|
|
|
+ f"{method} {path} should require auth but got {response.status_code}: {response.json()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "method,path",
|
|
|
+ [
|
|
|
+ ("GET", "/api/v1/spoolman/inventory/spools"),
|
|
|
+ ("GET", "/api/v1/spoolman/inventory/spools/42"),
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_read_endpoints_require_auth(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ auth_and_spoolman_settings,
|
|
|
+ method: str,
|
|
|
+ path: str,
|
|
|
+ ):
|
|
|
+ """Read endpoints also require auth when auth is enabled."""
|
|
|
+ response = await async_client.request(method, path)
|
|
|
+ assert response.status_code == 401, (
|
|
|
+ f"{method} {path} should require auth but got {response.status_code}: {response.json()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ async def viewer_token(self, db_session):
|
|
|
+ """Create a Viewer-group user (INVENTORY_READ only, no INVENTORY_UPDATE)."""
|
|
|
+ from sqlalchemy import select
|
|
|
+
|
|
|
+ from backend.app.core.auth import create_access_token, get_password_hash
|
|
|
+ from backend.app.models.group import Group
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+ from backend.app.models.user import User
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value="http://localhost:7912"))
|
|
|
+ db_session.add(Settings(key="auth_enabled", value="true"))
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ viewer_group = (await db_session.execute(select(Group).where(Group.name == "Viewers"))).scalar_one()
|
|
|
+ viewer = User(
|
|
|
+ username="sm_inv_viewer",
|
|
|
+ password_hash=get_password_hash("pw"),
|
|
|
+ is_active=True,
|
|
|
+ )
|
|
|
+ viewer.groups.append(viewer_group)
|
|
|
+ db_session.add(viewer)
|
|
|
+ await db_session.commit()
|
|
|
+ return create_access_token(data={"sub": viewer.username})
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "method,path,payload",
|
|
|
+ [
|
|
|
+ ("POST", "/api/v1/spoolman/inventory/spools", {"material": "PLA", "label_weight": 1000, "weight_used": 0}),
|
|
|
+ (
|
|
|
+ "POST",
|
|
|
+ "/api/v1/spoolman/inventory/spools/bulk",
|
|
|
+ {"spool": {"material": "PLA", "label_weight": 1000, "weight_used": 0}, "quantity": 1},
|
|
|
+ ),
|
|
|
+ ("PATCH", "/api/v1/spoolman/inventory/spools/42", {"note": "x"}),
|
|
|
+ ("DELETE", "/api/v1/spoolman/inventory/spools/42", None),
|
|
|
+ ("POST", "/api/v1/spoolman/inventory/spools/42/archive", None),
|
|
|
+ ("POST", "/api/v1/spoolman/inventory/spools/42/restore", None),
|
|
|
+ ("PATCH", "/api/v1/spoolman/inventory/spools/42/weight", {"weight_grams": 100.0}),
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_write_endpoints_return_403_for_viewer(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ viewer_token,
|
|
|
+ method: str,
|
|
|
+ path: str,
|
|
|
+ payload: dict | None,
|
|
|
+ ):
|
|
|
+ """Viewer-group users (INVENTORY_READ, no INVENTORY_UPDATE) get 403 on write endpoints."""
|
|
|
+ response = await async_client.request(
|
|
|
+ method,
|
|
|
+ path,
|
|
|
+ json=payload,
|
|
|
+ headers={"Authorization": f"Bearer {viewer_token}"},
|
|
|
+ )
|
|
|
+ assert response.status_code == 403, (
|
|
|
+ f"{method} {path} should return 403 for read-only user but got {response.status_code}: {response.json()}"
|
|
|
+ )
|
|
|
+ # Error body must mention the permission string so a "banned-user middleware"
|
|
|
+ # regression (generic 403 with no permission context) doesn't pass silently.
|
|
|
+ detail = response.json().get("detail", "")
|
|
|
+ assert "inventory:update" in detail, f"Expected 'inventory:update' in 403 detail but got: {detail!r}"
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# Additional regression tests for second-round review items
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventorySecurityExtras:
|
|
|
+ """Additional security/validation tests added in second review round."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_rejects_double_hash_rgba(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """SEC-3: rgba like '##FF0000' (double hash) must be rejected with 422."""
|
|
|
+ payload = {"material": "PLA", "label_weight": 1000, "weight_used": 0, "rgba": "##FF0000"}
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools", json=payload)
|
|
|
+ assert response.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize("spool_id", [0, -1])
|
|
|
+ async def test_path_param_non_positive_spool_id_returns_422(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ spool_id: int,
|
|
|
+ ):
|
|
|
+ """SEC-5: /spools/0 and /spools/-1 must be rejected with 422 (Path gt=0)."""
|
|
|
+ response = await async_client.get(f"/api/v1/spoolman/inventory/spools/{spool_id}")
|
|
|
+ assert response.status_code == 422, f"Expected 422 for spool_id={spool_id} but got {response.status_code}"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "tag_uid,expected_status",
|
|
|
+ [
|
|
|
+ # After B1 fix: non-null tag_uid on PATCH /spools/{id} is rejected (use /tag endpoint)
|
|
|
+ ("A" * 30, 422), # non-null → 422 (use /tag endpoint instead)
|
|
|
+ ("DEADBEEF12345678", 422), # non-null → 422 regardless of length
|
|
|
+ ("A" * 31, 422), # exceeds max_length — also 422
|
|
|
+ ("A" * 32, 422), # tray_uuid-length value — also 422
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_tag_uid_length_boundary(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ tag_uid: str,
|
|
|
+ expected_status: int,
|
|
|
+ ):
|
|
|
+ """tag_uid on PATCH /spools/{id} — all non-null values are rejected (B1 fix; use /tag endpoint)."""
|
|
|
+ payload = {"tag_uid": tag_uid}
|
|
|
+ response = await async_client.patch("/api/v1/spoolman/inventory/spools/42", json=payload)
|
|
|
+ assert response.status_code == expected_status, (
|
|
|
+ f"tag_uid len={len(tag_uid)}: expected {expected_status} but got {response.status_code}"
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_bulk_create_partial_failure_returns_207(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """I9: bulk create with quantity=3 where middle call fails → 207 Multi-Status."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanUnavailableError
|
|
|
+
|
|
|
+ results = [SAMPLE_SPOOLMAN_SPOOL, SpoolmanUnavailableError("Spoolman down"), SAMPLE_SPOOLMAN_SPOOL]
|
|
|
+ mock_spoolman_client.create_spool.side_effect = results
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "spool": {"material": "PLA", "label_weight": 1000, "weight_used": 0},
|
|
|
+ "quantity": 3,
|
|
|
+ }
|
|
|
+ response = await async_client.post("/api/v1/spoolman/inventory/spools/bulk", json=payload)
|
|
|
+ assert response.status_code == 207, (
|
|
|
+ f"Expected 207 Multi-Status for partial failure but got {response.status_code}"
|
|
|
+ )
|
|
|
+ body = response.json()
|
|
|
+ assert isinstance(body, dict)
|
|
|
+ assert body["requested_count"] == 3
|
|
|
+ assert body["failed_count"] == 1
|
|
|
+ assert len(body["created"]) == 2
|
|
|
+
|
|
|
+
|
|
|
+class TestTagClearPreservesExtraKeys:
|
|
|
+ """Regression test: clearing tag_uid must not wipe unrelated Spoolman extra fields."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_clear_preserves_custom_extra_key(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """PATCH tag_uid=None clears tag without dropping unrelated extra keys.
|
|
|
+
|
|
|
+ Spoolman PATCHes the extra dict by MERGING — popping a key from the
|
|
|
+ dict and sending the rest doesn't actually clear it. The endpoint
|
|
|
+ sets tag = json.dumps("") explicitly; read-side filters strip the
|
|
|
+ wrapping quotes and treat the empty string as "no tag" (#1114).
|
|
|
+ """
|
|
|
+ import json as _json
|
|
|
+
|
|
|
+ spool_with_extra = {
|
|
|
+ **SAMPLE_SPOOLMAN_SPOOL,
|
|
|
+ "extra": {"tag": '"AABBCCDDEEFF0011AABBCCDDEEFF0011"', "custom_key": "keep_me"},
|
|
|
+ }
|
|
|
+ mock_spoolman_client.get_spool = AsyncMock(return_value=spool_with_extra)
|
|
|
+ mock_spoolman_client.update_spool_full = AsyncMock(return_value=spool_with_extra)
|
|
|
+
|
|
|
+ response = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={"tag_uid": None},
|
|
|
+ )
|
|
|
+ assert response.status_code == 200
|
|
|
+
|
|
|
+ mock_spoolman_client.update_spool_full.assert_called_once()
|
|
|
+ _, kwargs = mock_spoolman_client.update_spool_full.call_args
|
|
|
+ sent_extra = kwargs.get("extra")
|
|
|
+ assert sent_extra is not None, "extra must be sent when tag is cleared"
|
|
|
+ assert sent_extra.get("tag") == _json.dumps(""), (
|
|
|
+ "tag must be set to JSON empty-string sentinel (Spoolman PATCH merges; "
|
|
|
+ "popping the key would leave the previous value in place)"
|
|
|
+ )
|
|
|
+ assert sent_extra.get("custom_key") == "keep_me", "unrelated extra keys must survive"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_clear_refetches_spool_inside_lock(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """B7: tag-clear does a fresh get_spool() re-fetch inside the lock, not the stale one.
|
|
|
+
|
|
|
+ Simulates a write that changes extra between the initial get_spool (used for
|
|
|
+ other field resolution) and the lock acquisition. The extra sent to
|
|
|
+ update_spool_full must come from the second (in-lock) fetch, not the first.
|
|
|
+ """
|
|
|
+ stale_extra = {"tag": '"AABBCCDD"', "custom_key": "stale_value"}
|
|
|
+ fresh_extra = {"tag": '"AABBCCDD"', "custom_key": "fresh_value"}
|
|
|
+
|
|
|
+ stale_spool = {**SAMPLE_SPOOLMAN_SPOOL, "extra": stale_extra}
|
|
|
+ fresh_spool = {**SAMPLE_SPOOLMAN_SPOOL, "extra": fresh_extra}
|
|
|
+
|
|
|
+ # First call returns stale; second call (inside lock) returns fresh
|
|
|
+ mock_spoolman_client.get_spool = AsyncMock(side_effect=[stale_spool, fresh_spool])
|
|
|
+ mock_spoolman_client.update_spool_full = AsyncMock(return_value=fresh_spool)
|
|
|
+
|
|
|
+ response = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={"tag_uid": None, "tray_uuid": None},
|
|
|
+ )
|
|
|
+ assert response.status_code == 200
|
|
|
+
|
|
|
+ # get_spool called twice: once for field resolution, once for fresh extra fetch
|
|
|
+ assert mock_spoolman_client.get_spool.call_count == 2
|
|
|
+
|
|
|
+ import json as _json
|
|
|
+
|
|
|
+ _, kwargs = mock_spoolman_client.update_spool_full.call_args
|
|
|
+ sent_extra = kwargs.get("extra")
|
|
|
+ assert sent_extra is not None
|
|
|
+ # Tag is set to the JSON empty-string sentinel (not popped) — Spoolman
|
|
|
+ # PATCH merges, so popping the key would leave the previous value.
|
|
|
+ assert sent_extra.get("tag") == _json.dumps("")
|
|
|
+ # custom_key must come from the fresh re-fetch, not the stale first fetch
|
|
|
+ assert sent_extra.get("custom_key") == "fresh_value"
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventorySSRFSpoolBuddyPath:
|
|
|
+ """SSRF tests for _get_spoolman_client_or_none (nfc/* and scale/ endpoints)."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "evil_url",
|
|
|
+ [
|
|
|
+ "file:///etc/passwd",
|
|
|
+ "http://169.254.169.254/latest/meta-data/", # AWS IMDS
|
|
|
+ "http://0.0.0.0/", # unspecified
|
|
|
+ "http://[::ffff:169.254.169.254]/", # IPv4-mapped IMDS bypass
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_nfc_tag_scanned_with_ssrf_url_ignores_spoolman(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ db_session,
|
|
|
+ evil_url: str,
|
|
|
+ ):
|
|
|
+ """SSRF: _get_spoolman_client_or_none silently disables Spoolman for unsafe URLs
|
|
|
+ on the SpoolBuddy NFC path (tag-scanned broadcasts unknown_tag, not 400)."""
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value=evil_url))
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ from unittest.mock import AsyncMock, patch
|
|
|
+
|
|
|
+ with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
|
|
|
+ mock_ws.broadcast = AsyncMock()
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolbuddy/nfc/tag-scanned",
|
|
|
+ json={"device_id": "sb-ssrf", "tag_uid": "AABBCCDD"},
|
|
|
+ )
|
|
|
+
|
|
|
+ # Must not crash or proxy the SSRF URL — unknown_tag is the safe degraded response
|
|
|
+ assert resp.status_code == 200
|
|
|
+ if mock_ws.broadcast.called:
|
|
|
+ msg = mock_ws.broadcast.call_args[0][0]
|
|
|
+ assert msg["type"] == "spoolbuddy_unknown_tag"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "evil_url",
|
|
|
+ [
|
|
|
+ "http://169.254.169.254/latest/meta-data/", # AWS IMDS
|
|
|
+ "http://[::ffff:169.254.169.254]/", # IPv4-mapped IMDS bypass
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_nfc_write_result_with_ssrf_url_degrades_gracefully(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ db_session,
|
|
|
+ evil_url: str,
|
|
|
+ ):
|
|
|
+ """SSRF: write-result with unsafe Spoolman URL must not proxy to the evil host.
|
|
|
+
|
|
|
+ write-result calls Spoolman to write-back the tag UID when data_origin='spoolman'.
|
|
|
+ With an SSRF URL, _get_spoolman_client_or_none returns None so the call is skipped
|
|
|
+ and the route returns 502 (tag written but link not persisted — not a server crash).
|
|
|
+ """
|
|
|
+ import json as _json
|
|
|
+
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+ from backend.app.models.spoolbuddy_device import SpoolBuddyDevice
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value=evil_url))
|
|
|
+ # Register the device so the route doesn't 404 before reaching the SSRF guard.
|
|
|
+ db_session.add(
|
|
|
+ SpoolBuddyDevice(
|
|
|
+ device_id="sb-ssrf-wr",
|
|
|
+ hostname="sb-ssrf-wr.local",
|
|
|
+ ip_address="127.0.0.1",
|
|
|
+ pending_command="write_tag",
|
|
|
+ pending_write_payload=_json.dumps({"spool_id": 99, "ndef_data_hex": "DEAD", "data_origin": "spoolman"}),
|
|
|
+ )
|
|
|
+ )
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ from unittest.mock import AsyncMock, patch
|
|
|
+
|
|
|
+ with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
|
|
|
+ mock_ws.broadcast = AsyncMock()
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolbuddy/nfc/write-result",
|
|
|
+ json={
|
|
|
+ "device_id": "sb-ssrf-wr",
|
|
|
+ "spool_id": 99,
|
|
|
+ "tag_uid": "AABBCCDD",
|
|
|
+ "success": True,
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ # 502 = tag written to NFC but Spoolman link not persisted (SSRF guard blocked it).
|
|
|
+ # Must not be 500 (crash) and must not have proxied to the evil host.
|
|
|
+ assert resp.status_code == 502
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "evil_url",
|
|
|
+ [
|
|
|
+ "http://169.254.169.254/latest/meta-data/", # AWS IMDS
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_scale_update_weight_with_ssrf_url_degrades_gracefully(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ db_session,
|
|
|
+ evil_url: str,
|
|
|
+ ):
|
|
|
+ """SSRF: scale weight update with unsafe Spoolman URL must not proxy to the evil host."""
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value=evil_url))
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ from unittest.mock import AsyncMock, patch
|
|
|
+
|
|
|
+ with patch("backend.app.api.routes.spoolbuddy.ws_manager") as mock_ws:
|
|
|
+ mock_ws.broadcast = AsyncMock()
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolbuddy/scale/update-spool-weight",
|
|
|
+ json={"device_id": "sb-ssrf-scale", "spool_id": 1, "weight_grams": 500.0},
|
|
|
+ )
|
|
|
+
|
|
|
+ # Must not crash or proxy to an SSRF host
|
|
|
+ assert resp.status_code in (200, 404, 422)
|
|
|
+
|
|
|
+
|
|
|
+class TestMergeSpoolExtraPreservesKeys:
|
|
|
+ """Unit-level test for merge_spool_extra key preservation (via mocked Spoolman)."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_merge_preserves_unrelated_extra_keys(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_spoolman_client,
|
|
|
+ ):
|
|
|
+ """merge_spool_extra must deep-merge rather than overwrite the extra dict.
|
|
|
+
|
|
|
+ Seed extra={"custom_key": "keep_me", "tag": "old"}.
|
|
|
+ After merging {"tag": "new"}, the PATCH payload must still contain custom_key.
|
|
|
+ """
|
|
|
+ from unittest.mock import AsyncMock, patch
|
|
|
+
|
|
|
+ existing_spool = {
|
|
|
+ **SAMPLE_SPOOLMAN_SPOOL,
|
|
|
+ "extra": {"custom_key": "keep_me", "tag": '"old"'},
|
|
|
+ }
|
|
|
+ updated_spool = {**existing_spool, "extra": {"custom_key": "keep_me", "tag": '"new"'}}
|
|
|
+
|
|
|
+ mock_client = mock_spoolman_client
|
|
|
+ mock_client.get_spool = AsyncMock(return_value=existing_spool)
|
|
|
+ mock_client.update_spool_full = AsyncMock(return_value=updated_spool)
|
|
|
+
|
|
|
+ # Call merge_spool_extra directly through the service
|
|
|
+ from backend.app.services.spoolman import SpoolmanClient
|
|
|
+
|
|
|
+ client = SpoolmanClient.__new__(SpoolmanClient)
|
|
|
+ client.base_url = "http://localhost:7912"
|
|
|
+ client.api_url = "http://localhost:7912/api/v1"
|
|
|
+ client._extra_locks = {}
|
|
|
+
|
|
|
+ async def _mock_get(spool_id):
|
|
|
+ return existing_spool
|
|
|
+
|
|
|
+ async def _mock_update(spool_id, **kwargs):
|
|
|
+ # Capture what was actually sent
|
|
|
+ _mock_update.captured_extra = kwargs.get("extra")
|
|
|
+ return updated_spool
|
|
|
+
|
|
|
+ _mock_update.captured_extra = None
|
|
|
+ client.get_spool = _mock_get
|
|
|
+ client.update_spool_full = _mock_update
|
|
|
+
|
|
|
+ result = await client.merge_spool_extra(42, {"tag": '"new"'})
|
|
|
+
|
|
|
+ # The merged extra must include the unrelated key
|
|
|
+ assert _mock_update.captured_extra is not None
|
|
|
+ assert _mock_update.captured_extra.get("custom_key") == "keep_me"
|
|
|
+ assert _mock_update.captured_extra.get("tag") == '"new"'
|
|
|
+ assert result is not None
|
|
|
+
|
|
|
+
|
|
|
+class TestGetClientValueError:
|
|
|
+ """Test the ValueError branch in _get_client when init_spoolman_client fails (Gap 5)."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_returns_400_when_init_spoolman_client_raises_value_error(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings
|
|
|
+ ):
|
|
|
+ """If init_spoolman_client raises ValueError after SSRF check passes, return HTTP 400."""
|
|
|
+ with (
|
|
|
+ patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory.get_spoolman_client",
|
|
|
+ AsyncMock(return_value=None),
|
|
|
+ ),
|
|
|
+ patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory.init_spoolman_client",
|
|
|
+ AsyncMock(side_effect=ValueError("unsupported scheme")),
|
|
|
+ ),
|
|
|
+ ):
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ assert resp.status_code == 400
|
|
|
+ assert "unsupported scheme" in resp.json()["detail"]
|
|
|
+
|
|
|
+
|
|
|
+class TestBulkCreateWithPriceFailure:
|
|
|
+ """Test that bulk create handles price-update failures per C1/C8 semantics."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_bulk_create_price_503_moves_spool_to_failures(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """When price update fails (503), the spool goes to failures — overall returns 207 if at least one succeeds."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanUnavailableError
|
|
|
+
|
|
|
+ # First price update fails (SpoolmanUnavailableError → 503), second succeeds
|
|
|
+ mock_spoolman_client.update_spool_full = AsyncMock(
|
|
|
+ side_effect=[SpoolmanUnavailableError("price server down"), SAMPLE_SPOOLMAN_SPOOL]
|
|
|
+ )
|
|
|
+ mock_spoolman_client.create_spool = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "spool": {
|
|
|
+ "material": "PLA",
|
|
|
+ "brand": "Bambu Lab",
|
|
|
+ "label_weight": 1000,
|
|
|
+ "cost_per_kg": 19.99,
|
|
|
+ },
|
|
|
+ "quantity": 2,
|
|
|
+ }
|
|
|
+ resp = await async_client.post("/api/v1/spoolman/inventory/spools/bulk", json=payload)
|
|
|
+ # One spool succeeded, one failed (price 503) → 207 Partial
|
|
|
+ assert resp.status_code == 207
|
|
|
+ data = resp.json()
|
|
|
+ assert len(data["created"]) == 1
|
|
|
+ assert data["failed_count"] == 1
|
|
|
+ # Both Spoolman creates were attempted
|
|
|
+ assert mock_spoolman_client.create_spool.call_count == 2
|
|
|
+ # Both price updates were attempted
|
|
|
+ assert mock_spoolman_client.update_spool_full.call_count == 2
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolTagLinkValidation:
|
|
|
+ """NEW-B1: /spools/{id}/tag endpoint validates tag_uid length and content."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_uid_6_chars_rejected(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client):
|
|
|
+ """tag_uid with 6 hex chars is rejected — minimum is 8 chars (4-byte UID)."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/tag",
|
|
|
+ json={"tag_uid": "AABBCC"}, # 6 chars — below new minimum
|
|
|
+ )
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_uid_all_zeros_rejected(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client):
|
|
|
+ """tag_uid that is all-zero bytes is rejected as an unwritten/blank tag."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/tag",
|
|
|
+ json={"tag_uid": "00000000000000"}, # 14 zeros
|
|
|
+ )
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_uid_valid_14_chars_accepted(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """tag_uid with 14 valid hex chars (7-byte UID) is accepted."""
|
|
|
+ # This tag is not in SAMPLE_SPOOLMAN_SPOOL so no duplicate conflict.
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/tag",
|
|
|
+ json={"tag_uid": "AABBCCDD112233"}, # 14 chars, valid, not all-zeros
|
|
|
+ )
|
|
|
+ assert resp.status_code == 200
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_uid_8_chars_accepted(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client):
|
|
|
+ """tag_uid with 8 hex chars (4-byte Bambu Lab NFC UID) is accepted after min_length fix."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/tag",
|
|
|
+ json={"tag_uid": "2728C17B"}, # 8 chars — real Bambu Lab 4-byte hardware UID
|
|
|
+ )
|
|
|
+ assert resp.status_code == 200
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_tag_uid_8_zeros_rejected(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client):
|
|
|
+ """tag_uid with 8 zero chars is rejected — all-zeros validator applies at the new minimum."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/tag",
|
|
|
+ json={"tag_uid": "00000000"}, # 8 zeros — meets min_length but is a blank/unwritten tag
|
|
|
+ )
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+
|
|
|
+class TestLinkTagDuplicate:
|
|
|
+ """NEW-I1: /spools/{id}/tag returns 409 when another spool already has the same tag."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_link_tag_returns_200_when_tag_not_on_another_spool(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """Linking a fresh tag to spool 42 returns 200 — no duplicate in Spoolman."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/tag",
|
|
|
+ json={"tag_uid": "AABBCCDD112233"}, # not in SAMPLE_SPOOLMAN_SPOOL
|
|
|
+ )
|
|
|
+ assert resp.status_code == 200
|
|
|
+ mock_spoolman_client.update_spool_full.assert_called_once()
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_link_tag_returns_409_when_same_tag_on_different_spool(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """Linking spool 99 to a tag that spool 42 already carries must return 409."""
|
|
|
+ # SAMPLE_SPOOLMAN_SPOOL (id=42) has extra.tag = '"AABBCCDDEEFF0011AABBCCDDEEFF0011"'.
|
|
|
+ # Attempting to assign the same tag to spool 99 must be rejected.
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/99/tag",
|
|
|
+ json={"tray_uuid": "AABBCCDDEEFF0011AABBCCDDEEFF0011"}, # 32-char tray UUID
|
|
|
+ )
|
|
|
+ assert resp.status_code == 409
|
|
|
+ detail = resp.json()["detail"]
|
|
|
+ assert "42" in str(detail)
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventoryUpdateCoreWeight:
|
|
|
+ """core_weight is accepted for schema parity but not persisted — any value should be accepted."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_patch_core_weight_other_than_250_accepted(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """PATCH with core_weight != 250 is accepted (field is ignored server-side, not rejected)."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={"core_weight": 100},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 200
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_patch_core_weight_250_explicitly_is_accepted(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """PATCH with core_weight=250 (the default) is valid and returns 200."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={"core_weight": 250},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 200
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_patch_without_core_weight_is_accepted(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """PATCH without core_weight (omitted) must not trigger the validator — returns 200."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={"note": "no core_weight key"},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 200
|
|
|
+
|
|
|
+
|
|
|
+class TestUnlinkSpool:
|
|
|
+ """POST /spoolman/spools/{id}/unlink clears Spoolman tag without re-entrant lock deadlock.
|
|
|
+
|
|
|
+ Spoolman PATCHes the extra dict by MERGING — popping a key + sending the
|
|
|
+ rest doesn't clear the popped key. The endpoint sends the JSON empty-string
|
|
|
+ sentinel ('""') which the read-side filters strip. (#1114)
|
|
|
+
|
|
|
+ The endpoint uses merge_spool_extra (not update_spool_full directly)
|
|
|
+ because (a) merge_spool_extra owns the per-spool extra_lock for atomic
|
|
|
+ read-modify-write semantics, and (b) wrapping it in another extra_lock
|
|
|
+ would deadlock — asyncio.Lock is not re-entrant.
|
|
|
+ """
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def mock_unlink_client(self):
|
|
|
+ """Mock Spoolman client for the spoolman.py (non-inventory) route."""
|
|
|
+ spool_with_tag = {
|
|
|
+ **SAMPLE_SPOOLMAN_SPOOL,
|
|
|
+ "extra": {"tag": '"AABBCCDDEEFF0011AABBCCDDEEFF0011"', "custom": "keep"},
|
|
|
+ }
|
|
|
+ mock_client = MagicMock()
|
|
|
+ mock_client.base_url = "http://localhost:7912"
|
|
|
+ mock_client.health_check = AsyncMock(return_value=True)
|
|
|
+ mock_client.get_spool = AsyncMock(return_value=spool_with_tag)
|
|
|
+ # merge_spool_extra returns the spool with the tag cleared (and custom
|
|
|
+ # preserved) — that's what the read-side will see after the fix.
|
|
|
+ mock_client.merge_spool_extra = AsyncMock(
|
|
|
+ return_value={**spool_with_tag, "extra": {"tag": '""', "custom": "keep"}}
|
|
|
+ )
|
|
|
+
|
|
|
+ with (
|
|
|
+ patch(
|
|
|
+ "backend.app.api.routes.spoolman.get_spoolman_client",
|
|
|
+ AsyncMock(return_value=mock_client),
|
|
|
+ ),
|
|
|
+ patch(
|
|
|
+ "backend.app.api.routes.spoolman.init_spoolman_client",
|
|
|
+ AsyncMock(return_value=mock_client),
|
|
|
+ ),
|
|
|
+ ):
|
|
|
+ yield mock_client
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_unlink_sets_tag_to_json_empty_string(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_unlink_client,
|
|
|
+ ):
|
|
|
+ """Unlink calls merge_spool_extra with the JSON-empty-string sentinel.
|
|
|
+
|
|
|
+ Pre-fix the endpoint did `cur_extra.pop("tag")` then PATCHed the rest.
|
|
|
+ Spoolman silently kept the previous tag because the key wasn't in the
|
|
|
+ payload (PATCH merges). Now the endpoint sends `{"tag": '""'}` and
|
|
|
+ the read-side .strip('"') resolves it to "" → spool drops out of
|
|
|
+ get_linked_spools.
|
|
|
+ """
|
|
|
+ import json as _json
|
|
|
+
|
|
|
+ resp = await async_client.post("/api/v1/spoolman/spools/42/unlink")
|
|
|
+ assert resp.status_code == 200
|
|
|
+
|
|
|
+ mock_unlink_client.merge_spool_extra.assert_called_once_with(42, {"tag": _json.dumps("")})
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_unlink_preserves_other_extra_keys(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ spoolman_settings,
|
|
|
+ mock_unlink_client,
|
|
|
+ ):
|
|
|
+ """Unrelated extra keys must survive unlink.
|
|
|
+
|
|
|
+ merge_spool_extra is responsible for the merge (read current → merge
|
|
|
+ new fields → PATCH). The unlink endpoint only sends `{"tag": ...}`,
|
|
|
+ so any other extra key on the spool is automatically preserved by
|
|
|
+ merge_spool_extra's read-merge-write semantics.
|
|
|
+ """
|
|
|
+ resp = await async_client.post("/api/v1/spoolman/spools/42/unlink")
|
|
|
+ assert resp.status_code == 200
|
|
|
+
|
|
|
+ # The endpoint passes only the tag key — merge_spool_extra does the
|
|
|
+ # rest. We don't assert anything about `custom` on the call args
|
|
|
+ # because the route doesn't see / pass it.
|
|
|
+ _, args, _ = mock_unlink_client.merge_spool_extra.mock_calls[0]
|
|
|
+ sent_fields = args[1] if len(args) >= 2 else {}
|
|
|
+ assert sent_fields == {"tag": '""'}, "unlink should only send the tag key — merge_spool_extra does the merge"
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# B1: GET /spoolman/inventory/filaments
|
|
|
+# B2: POST /spools with spoolman_filament_id bypasses find_or_create_filament
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+SAMPLE_FILAMENT_DICT = {
|
|
|
+ "id": 7,
|
|
|
+ "name": "PLA Basic",
|
|
|
+ "material": "PLA",
|
|
|
+ "color_hex": "FF0000",
|
|
|
+ "color_name": "Red",
|
|
|
+ "weight": 1000,
|
|
|
+ "spool_weight": 196,
|
|
|
+ "vendor": {"id": 3, "name": "Bambu Lab"},
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+class TestListSpoolmanFilaments:
|
|
|
+ """Tests for GET /api/v1/spoolman/inventory/filaments (B1)."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_list_filaments_disabled_returns_400(self, async_client: AsyncClient):
|
|
|
+ """Without Spoolman enabled the endpoint returns 400."""
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/filaments")
|
|
|
+ assert resp.status_code == 400
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_list_filaments_unreachable_returns_503(self, async_client: AsyncClient, spoolman_settings):
|
|
|
+ """503 is returned when _get_client raises HTTPException(503)."""
|
|
|
+ with patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory._get_client",
|
|
|
+ AsyncMock(side_effect=HTTPException(status_code=503, detail="Spoolman server is not reachable")),
|
|
|
+ ):
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/filaments")
|
|
|
+ assert resp.status_code == 503
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_list_filaments_success(self, async_client: AsyncClient, spoolman_settings):
|
|
|
+ """Success path returns normalised filament list including spool_weight."""
|
|
|
+ mock_client = MagicMock()
|
|
|
+ mock_client.get_filaments = AsyncMock(return_value=[SAMPLE_FILAMENT_DICT])
|
|
|
+ with patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory._get_client",
|
|
|
+ AsyncMock(return_value=mock_client),
|
|
|
+ ):
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/filaments")
|
|
|
+
|
|
|
+ assert resp.status_code == 200
|
|
|
+ data = resp.json()
|
|
|
+ assert isinstance(data, list)
|
|
|
+ assert len(data) == 1
|
|
|
+ entry = data[0]
|
|
|
+ assert entry["id"] == 7
|
|
|
+ assert entry["material"] == "PLA"
|
|
|
+ assert entry["spool_weight"] == 196
|
|
|
+ assert entry["vendor"]["name"] == "Bambu Lab"
|
|
|
+
|
|
|
+
|
|
|
+class TestCreateSpoolWithFilamentId:
|
|
|
+ """Tests for POST /api/v1/spoolman/inventory/spools with spoolman_filament_id (B2)."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_with_filament_id_skips_find_or_create(self, async_client: AsyncClient, spoolman_settings):
|
|
|
+ """When spoolman_filament_id is provided, find_or_create_filament must NOT be called."""
|
|
|
+ mock_client = MagicMock()
|
|
|
+ mock_client.find_or_create_filament = AsyncMock(return_value=7)
|
|
|
+ mock_client.create_spool = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+ mock_client.update_spool_full = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+ with patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory._get_client",
|
|
|
+ AsyncMock(return_value=mock_client),
|
|
|
+ ):
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolman/inventory/spools",
|
|
|
+ json={"spoolman_filament_id": 7},
|
|
|
+ )
|
|
|
+
|
|
|
+ assert resp.status_code == 200
|
|
|
+ mock_client.find_or_create_filament.assert_not_called()
|
|
|
+ mock_client.create_spool.assert_called_once()
|
|
|
+ _, kwargs = mock_client.create_spool.call_args
|
|
|
+ assert kwargs.get("filament_id") == 7
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_with_invalid_filament_id_returns_404(self, async_client: AsyncClient, spoolman_settings):
|
|
|
+ """An invalid spoolman_filament_id (not in Spoolman) must return 404."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanNotFoundError
|
|
|
+
|
|
|
+ mock_client = MagicMock()
|
|
|
+ mock_client.create_spool = AsyncMock(side_effect=SpoolmanNotFoundError("filament not found"))
|
|
|
+ with patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory._get_client",
|
|
|
+ AsyncMock(return_value=mock_client),
|
|
|
+ ):
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolman/inventory/spools",
|
|
|
+ json={"spoolman_filament_id": 9999},
|
|
|
+ )
|
|
|
+
|
|
|
+ assert resp.status_code == 404
|
|
|
+ assert "9999" in resp.json()["detail"]
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# WICHTIG-12: Additional edge-case tests
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestBulkCreateWithFilamentId:
|
|
|
+ """Bulk create with spoolman_filament_id skips find_or_create_filament."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_bulk_create_with_filament_id_skips_find_or_create(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings
|
|
|
+ ):
|
|
|
+ """Bulk POST with spoolman_filament_id must NOT call find_or_create_filament."""
|
|
|
+ mock_client = MagicMock()
|
|
|
+ mock_client.find_or_create_filament = AsyncMock(return_value=7)
|
|
|
+ mock_client.create_spool = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+ mock_client.update_spool_full = AsyncMock(return_value=SAMPLE_SPOOLMAN_SPOOL)
|
|
|
+ with patch(
|
|
|
+ "backend.app.api.routes.spoolman_inventory._get_client",
|
|
|
+ AsyncMock(return_value=mock_client),
|
|
|
+ ):
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolman/inventory/spools/bulk",
|
|
|
+ json={"spool": {"spoolman_filament_id": 7}, "quantity": 2},
|
|
|
+ )
|
|
|
+
|
|
|
+ assert resp.status_code == 200
|
|
|
+ mock_client.find_or_create_filament.assert_not_called()
|
|
|
+ assert mock_client.create_spool.call_count == 2
|
|
|
+ for call in mock_client.create_spool.call_args_list:
|
|
|
+ _, kwargs = call
|
|
|
+ assert kwargs.get("filament_id") == 7
|
|
|
+
|
|
|
+
|
|
|
+class TestCreateSpoolValidation:
|
|
|
+ """Validation edge cases for SpoolmanInventoryCreate."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_spool_filament_id_zero_returns_422(self, async_client: AsyncClient, spoolman_settings):
|
|
|
+ """spoolman_filament_id=0 must fail Field(gt=0) validation → 422."""
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolman/inventory/spools",
|
|
|
+ json={"spoolman_filament_id": 0},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_spool_without_material_or_filament_id_returns_422(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings
|
|
|
+ ):
|
|
|
+ """Neither material nor spoolman_filament_id → model_validator must reject → 422."""
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolman/inventory/spools",
|
|
|
+ json={"label_weight": 1000},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+
|
|
|
+class TestNormalizeFilament:
|
|
|
+ """Unit-style tests for _normalize_filament helper (imported directly)."""
|
|
|
+
|
|
|
+ def test_normalize_filament_null_vendor(self):
|
|
|
+ from backend.app.api.routes.spoolman_inventory import _normalize_filament
|
|
|
+
|
|
|
+ result = _normalize_filament({"id": 5, "name": "PLA", "vendor": None})
|
|
|
+ assert result is not None
|
|
|
+ assert result["vendor"] is None
|
|
|
+
|
|
|
+ def test_normalize_filament_null_id_returns_none(self):
|
|
|
+ from backend.app.api.routes.spoolman_inventory import _normalize_filament
|
|
|
+
|
|
|
+ result = _normalize_filament({"id": None, "name": "PLA"})
|
|
|
+ assert result is None
|
|
|
+
|
|
|
+ def test_normalize_filament_zero_id_returns_none(self):
|
|
|
+ from backend.app.api.routes.spoolman_inventory import _normalize_filament
|
|
|
+
|
|
|
+ result = _normalize_filament({"id": 0, "name": "PLA"})
|
|
|
+ assert result is None
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# F1: TestTranslateSpoolmanErrors — 502/404/503 paths through _translate_spoolman_errors
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestTranslateSpoolmanErrors:
|
|
|
+ """F1: _translate_spoolman_errors() maps Spoolman exceptions to HTTP codes."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_spoolman_not_found_returns_404(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """SpoolmanNotFoundError from get_spool → 404."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanNotFoundError
|
|
|
+
|
|
|
+ mock_spoolman_client.get_spool.side_effect = SpoolmanNotFoundError("spool 999 not found")
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/spools/999")
|
|
|
+ assert resp.status_code == 404
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_spoolman_unavailable_returns_503(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """SpoolmanUnavailableError from get_spool → 503."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanUnavailableError
|
|
|
+
|
|
|
+ mock_spoolman_client.get_spool.side_effect = SpoolmanUnavailableError("network error")
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/spools/42")
|
|
|
+ assert resp.status_code == 503
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_spoolman_client_error_returns_502_with_upstream_status(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """SpoolmanClientError from get_spool → 502 with upstream_status in body."""
|
|
|
+ from backend.app.services.spoolman import SpoolmanClientError
|
|
|
+
|
|
|
+ mock_spoolman_client.get_spool.side_effect = SpoolmanClientError("Spoolman rejected", 422, "filament not found")
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/spools/42")
|
|
|
+ assert resp.status_code == 502
|
|
|
+ body = resp.json()
|
|
|
+ assert body["detail"]["upstream_status"] == 422
|
|
|
+ assert body["detail"]["upstream_body"] == "filament not found"
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# F2: _get_client health_check returns False → 503
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestGetClientHealthCheckFalse:
|
|
|
+ """F2: _get_client raises 503 when health_check() returns False."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_returns_503_when_health_check_returns_false(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """health_check() → False should produce 503 on any inventory call."""
|
|
|
+ import time
|
|
|
+
|
|
|
+ import backend.app.api.routes.spoolman_inventory as inv_module
|
|
|
+
|
|
|
+ mock_spoolman_client.health_check = AsyncMock(return_value=False)
|
|
|
+ # Clear the TTL cache so health_check is actually called
|
|
|
+ inv_module._health_check_cache.clear()
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/spools")
|
|
|
+ assert resp.status_code == 503
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# F3: SpoolTagLinkRequest both fields null → 422
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolTagLinkBothNull:
|
|
|
+ """F3: /spools/{id}/tag with both tag_uid and tray_uuid null → 422."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_both_null_returns_422(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client):
|
|
|
+ """Sending {} (both fields absent) → at_least_one validator → 422."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/tag",
|
|
|
+ json={},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_both_explicitly_null_returns_422(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """Sending {tag_uid: null, tray_uuid: null} → at_least_one validator → 422."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42/tag",
|
|
|
+ json={"tag_uid": None, "tray_uuid": None},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# F5: RBAC lists — missing endpoints
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestSpoolmanInventoryAuthExtended:
|
|
|
+ """F5: Additional endpoints in RBAC auth/403 parametrize lists."""
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ async def auth_and_spoolman_settings(self, db_session):
|
|
|
+ from backend.app.models.settings import Settings
|
|
|
+
|
|
|
+ db_session.add(Settings(key="spoolman_enabled", value="true"))
|
|
|
+ db_session.add(Settings(key="spoolman_url", value="http://localhost:7912"))
|
|
|
+ db_session.add(Settings(key="auth_enabled", value="true"))
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "method,path,payload",
|
|
|
+ [
|
|
|
+ ("PATCH", "/api/v1/spoolman/inventory/spools/42/tag", {"tag_uid": "AABBCCDDEE112233"}),
|
|
|
+ ("POST", "/api/v1/spoolman/inventory/sync-ams-weights", {"printer_id": 1, "ams_data": []}),
|
|
|
+ ("PATCH", "/api/v1/spoolman/inventory/filaments/7", {"spool_weight": 196.0}),
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_extended_write_endpoints_require_auth(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ auth_and_spoolman_settings,
|
|
|
+ method: str,
|
|
|
+ path: str,
|
|
|
+ payload: dict | None,
|
|
|
+ ):
|
|
|
+ """Additional write endpoints return 401 when auth is enabled and no token is provided."""
|
|
|
+ resp = await async_client.request(method, path, json=payload)
|
|
|
+ assert resp.status_code == 401, f"{method} {path} should require auth but got {resp.status_code}: {resp.json()}"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ @pytest.mark.parametrize(
|
|
|
+ "method,path",
|
|
|
+ [
|
|
|
+ ("GET", "/api/v1/spoolman/inventory/filaments"),
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ async def test_extended_read_endpoints_require_auth(
|
|
|
+ self,
|
|
|
+ async_client: AsyncClient,
|
|
|
+ auth_and_spoolman_settings,
|
|
|
+ method: str,
|
|
|
+ path: str,
|
|
|
+ ):
|
|
|
+ """Additional read endpoints return 401 when auth is enabled and no token is provided."""
|
|
|
+ resp = await async_client.request(method, path)
|
|
|
+ assert resp.status_code == 401, f"{method} {path} should require auth but got {resp.status_code}: {resp.json()}"
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# F8: _normalize_filament negative ID returns None
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestNormalizeFilamentNegativeId:
|
|
|
+ """F8: _normalize_filament with negative id → None (was only checking == 0)."""
|
|
|
+
|
|
|
+ def test_normalize_filament_negative_id_returns_none(self):
|
|
|
+ from backend.app.api.routes.spoolman_inventory import _normalize_filament
|
|
|
+
|
|
|
+ result = _normalize_filament({"id": -1, "name": "PLA"})
|
|
|
+ assert result is None
|
|
|
+
|
|
|
+ def test_normalize_filament_large_negative_id_returns_none(self):
|
|
|
+ from backend.app.api.routes.spoolman_inventory import _normalize_filament
|
|
|
+
|
|
|
+ result = _normalize_filament({"id": -999, "name": "PLA"})
|
|
|
+ assert result is None
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# F9: weight_used > label_weight cross-field validator integration test
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestCreateSpoolWeightValidation:
|
|
|
+ """F9: SpoolmanInventoryCreate.validate_weight_consistency cross-field validator."""
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_weight_used_exceeds_label_weight_returns_422(self, async_client: AsyncClient, spoolman_settings):
|
|
|
+ """weight_used > label_weight → cross-field validator → 422."""
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolman/inventory/spools",
|
|
|
+ json={"material": "PLA", "label_weight": 500, "weight_used": 600},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 422
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_weight_used_equals_label_weight_accepted(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """weight_used == label_weight is exactly at the boundary → should pass (201)."""
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolman/inventory/spools",
|
|
|
+ json={"material": "PLA", "label_weight": 1000, "weight_used": 1000},
|
|
|
+ )
|
|
|
+ # 201 or 200 (spool created)
|
|
|
+ assert resp.status_code in (200, 201)
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_create_spool_with_non_default_core_weight_accepted(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """A3: core_weight != 250 must no longer be rejected → 201."""
|
|
|
+ resp = await async_client.post(
|
|
|
+ "/api/v1/spoolman/inventory/spools",
|
|
|
+ json={"material": "PLA", "label_weight": 1000, "weight_used": 0, "core_weight": 196},
|
|
|
+ )
|
|
|
+ assert resp.status_code in (200, 201)
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_update_spool_with_non_default_core_weight_accepted(
|
|
|
+ self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client
|
|
|
+ ):
|
|
|
+ """A3: PATCH with core_weight != 250 must no longer return 422."""
|
|
|
+ resp = await async_client.patch(
|
|
|
+ "/api/v1/spoolman/inventory/spools/42",
|
|
|
+ json={"core_weight": 300},
|
|
|
+ )
|
|
|
+ assert resp.status_code == 200
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# P8-T1: /slot-assignments/all enriches with printer_name + ams_label
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+class TestGetAllSlotAssignmentsEnriched:
|
|
|
+ """P8-T1: /slot-assignments/all enriches with printer_name + ams_label.
|
|
|
+
|
|
|
+ Regression for InventoryPage LOCATION column showing '-' for Spoolman
|
|
|
+ spools because the endpoint only returned 4 raw fields without the
|
|
|
+ printer_name + ams_label needed by the UI.
|
|
|
+ """
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_returns_printer_name_for_existing_printer(
|
|
|
+ self, async_client: AsyncClient, db_session, spoolman_settings
|
|
|
+ ):
|
|
|
+ """printer_name is enriched from the joined Printer relationship."""
|
|
|
+ from backend.app.models.printer import Printer
|
|
|
+ from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
|
|
|
+
|
|
|
+ db_session.add(
|
|
|
+ Printer(
|
|
|
+ id=1,
|
|
|
+ name="Sully",
|
|
|
+ model="X1C",
|
|
|
+ serial_number="SN1",
|
|
|
+ ip_address="1.2.3.4",
|
|
|
+ access_code="",
|
|
|
+ )
|
|
|
+ )
|
|
|
+ db_session.add(
|
|
|
+ SpoolmanSlotAssignment(
|
|
|
+ printer_id=1,
|
|
|
+ ams_id=0,
|
|
|
+ tray_id=2,
|
|
|
+ spoolman_spool_id=216,
|
|
|
+ )
|
|
|
+ )
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ with patch("backend.app.api.routes.spoolman_inventory.printer_manager") as mock_pm:
|
|
|
+ mock_pm.get_all_statuses.return_value = {}
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/slot-assignments/all")
|
|
|
+
|
|
|
+ assert resp.status_code == 200
|
|
|
+ data = resp.json()
|
|
|
+ assert len(data) == 1
|
|
|
+ assert data[0]["printer_name"] == "Sully"
|
|
|
+ assert data[0]["spoolman_spool_id"] == 216
|
|
|
+ assert data[0]["ams_id"] == 0
|
|
|
+ assert data[0]["tray_id"] == 2
|
|
|
+ assert data[0]["ams_label"] is None
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_returns_ams_label_when_label_configured(
|
|
|
+ self, async_client: AsyncClient, db_session, spoolman_settings
|
|
|
+ ):
|
|
|
+ """ams_label is enriched from AmsLabel via printer MQTT serial map."""
|
|
|
+ from backend.app.models.ams_label import AmsLabel
|
|
|
+ from backend.app.models.printer import Printer
|
|
|
+ from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
|
|
|
+
|
|
|
+ db_session.add(
|
|
|
+ Printer(
|
|
|
+ id=1,
|
|
|
+ name="Sully",
|
|
|
+ model="X1C",
|
|
|
+ serial_number="SN1",
|
|
|
+ ip_address="1.2.3.4",
|
|
|
+ access_code="",
|
|
|
+ )
|
|
|
+ )
|
|
|
+ db_session.add(AmsLabel(ams_serial_number="ABC123", label="Top Shelf"))
|
|
|
+ db_session.add(
|
|
|
+ SpoolmanSlotAssignment(
|
|
|
+ printer_id=1,
|
|
|
+ ams_id=0,
|
|
|
+ tray_id=2,
|
|
|
+ spoolman_spool_id=216,
|
|
|
+ )
|
|
|
+ )
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ mock_state = MagicMock(raw_data={"ams": [{"id": 0, "sn": "ABC123"}]})
|
|
|
+ with patch("backend.app.api.routes.spoolman_inventory.printer_manager") as mock_pm:
|
|
|
+ mock_pm.get_all_statuses.return_value = {1: mock_state}
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/slot-assignments/all")
|
|
|
+
|
|
|
+ assert resp.status_code == 200
|
|
|
+ assert resp.json()[0]["ams_label"] == "Top Shelf"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_synthetic_ams_label_fallback(self, async_client: AsyncClient, db_session, spoolman_settings):
|
|
|
+ """Falls back to synthetic 'p{pid}a{ams_id}' key when no MQTT serial available."""
|
|
|
+ from backend.app.models.ams_label import AmsLabel
|
|
|
+ from backend.app.models.printer import Printer
|
|
|
+ from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
|
|
|
+
|
|
|
+ db_session.add(
|
|
|
+ Printer(
|
|
|
+ id=1,
|
|
|
+ name="Sully",
|
|
|
+ model="X1C",
|
|
|
+ serial_number="SN1",
|
|
|
+ ip_address="1.2.3.4",
|
|
|
+ access_code="",
|
|
|
+ )
|
|
|
+ )
|
|
|
+ db_session.add(AmsLabel(ams_serial_number="p1a0", label="Synthetic Label"))
|
|
|
+ db_session.add(
|
|
|
+ SpoolmanSlotAssignment(
|
|
|
+ printer_id=1,
|
|
|
+ ams_id=0,
|
|
|
+ tray_id=2,
|
|
|
+ spoolman_spool_id=216,
|
|
|
+ )
|
|
|
+ )
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ with patch("backend.app.api.routes.spoolman_inventory.printer_manager") as mock_pm:
|
|
|
+ mock_pm.get_all_statuses.return_value = {} # No live state -> synthetic key
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/slot-assignments/all")
|
|
|
+
|
|
|
+ assert resp.json()[0]["ams_label"] == "Synthetic Label"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_filter_by_printer_id_still_works(self, async_client: AsyncClient, db_session, spoolman_settings):
|
|
|
+ """Regression: ?printer_id=N still filters and enriches."""
|
|
|
+ from backend.app.models.printer import Printer
|
|
|
+ from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
|
|
|
+
|
|
|
+ for pid in (1, 2):
|
|
|
+ db_session.add(
|
|
|
+ Printer(
|
|
|
+ id=pid,
|
|
|
+ name=f"P{pid}",
|
|
|
+ model="X1C",
|
|
|
+ serial_number=f"SN{pid}",
|
|
|
+ ip_address=f"1.2.3.{pid}",
|
|
|
+ access_code="",
|
|
|
+ )
|
|
|
+ )
|
|
|
+ db_session.add(
|
|
|
+ SpoolmanSlotAssignment(
|
|
|
+ printer_id=pid,
|
|
|
+ ams_id=0,
|
|
|
+ tray_id=0,
|
|
|
+ spoolman_spool_id=200 + pid,
|
|
|
+ )
|
|
|
+ )
|
|
|
+ await db_session.commit()
|
|
|
+
|
|
|
+ with patch("backend.app.api.routes.spoolman_inventory.printer_manager") as mock_pm:
|
|
|
+ mock_pm.get_all_statuses.return_value = {}
|
|
|
+ resp = await async_client.get("/api/v1/spoolman/inventory/slot-assignments/all?printer_id=1")
|
|
|
+
|
|
|
+ data = resp.json()
|
|
|
+ assert len(data) == 1
|
|
|
+ assert data[0]["printer_id"] == 1
|
|
|
+ assert data[0]["printer_name"] == "P1"
|
|
|
+ assert data[0]["spoolman_spool_id"] == 201
|