"""Integration tests for Spoolman API endpoints.""" from unittest.mock import AsyncMock, MagicMock, patch import pytest from httpx import AsyncClient class TestSpoolmanAPI: """Integration tests for /api/v1/spoolman/ endpoints.""" @pytest.fixture async def spoolman_settings(self, db_session): """Create Spoolman settings in the database (enabled with URL).""" from backend.app.models.settings import Settings # Both settings are required for Spoolman to work enabled_setting = Settings(key="spoolman_enabled", value="true") url_setting = Settings(key="spoolman_url", value="http://localhost:7912") db_session.add(enabled_setting) db_session.add(url_setting) await db_session.commit() return {"enabled": enabled_setting, "url": url_setting} @pytest.fixture async def spoolman_url_only(self, db_session): """Create only the URL setting (not enabled).""" from backend.app.models.settings import Settings setting = Settings(key="spoolman_url", value="http://localhost:7912") db_session.add(setting) await db_session.commit() return setting @pytest.fixture def mock_spoolman_client(self): """Mock the Spoolman client functions.""" mock_client = MagicMock() mock_client.is_connected = True mock_client.base_url = "http://localhost:7912" mock_client.health_check = AsyncMock(return_value=True) mock_client.ensure_tag_extra_field = AsyncMock(return_value=True) mock_client.ensure_extra_field = AsyncMock(return_value=True) mock_client.get_spools = AsyncMock(return_value=[]) mock_client.get_filaments = AsyncMock(return_value=[]) mock_client.create_spool = AsyncMock(return_value={"id": 1}) mock_client.update_spool = AsyncMock(return_value={"id": 1}) mock_client.merge_spool_extra = AsyncMock(return_value={"id": 1, "extra": {}}) mock_client.close = AsyncMock() with ( patch( "backend.app.api.routes.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client), ), patch( "backend.app.api.routes.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client), ), patch( "backend.app.api.routes.spoolman.close_spoolman_client", AsyncMock(), ), ): yield mock_client @pytest.fixture def mock_spoolman_disconnected(self): """Mock the Spoolman client as disconnected (returns None).""" with ( patch( "backend.app.api.routes.spoolman.get_spoolman_client", AsyncMock(return_value=None), ), patch( "backend.app.api.routes.spoolman.init_spoolman_client", AsyncMock(return_value=None), ), ): yield # ========================================================================= # Status Endpoint Tests # ========================================================================= @pytest.mark.asyncio @pytest.mark.integration async def test_get_status_not_configured(self, async_client: AsyncClient): """Verify status shows not enabled when no settings exist.""" response = await async_client.get("/api/v1/spoolman/status") assert response.status_code == 200 data = response.json() assert data["enabled"] is False assert data["connected"] is False @pytest.mark.asyncio @pytest.mark.integration async def test_get_status_url_only_not_enabled(self, async_client: AsyncClient, spoolman_url_only): """Verify status shows not enabled when only URL is set.""" response = await async_client.get("/api/v1/spoolman/status") assert response.status_code == 200 data = response.json() assert data["enabled"] is False assert data["url"] == "http://localhost:7912" @pytest.mark.asyncio @pytest.mark.integration async def test_get_status_enabled_and_connected( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify status shows enabled and connected when properly configured.""" response = await async_client.get("/api/v1/spoolman/status") assert response.status_code == 200 data = response.json() assert data["enabled"] is True assert data["connected"] is True assert data["url"] == "http://localhost:7912" # ========================================================================= # Connect/Disconnect Tests # ========================================================================= @pytest.mark.asyncio @pytest.mark.integration async def test_connect_not_enabled(self, async_client: AsyncClient): """Verify connect fails when not enabled.""" response = await async_client.post("/api/v1/spoolman/connect") assert response.status_code == 400 assert "not enabled" in response.json()["detail"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_connect_success(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Verify successful connection to Spoolman.""" response = await async_client.post("/api/v1/spoolman/connect") assert response.status_code == 200 data = response.json() assert data["success"] is True assert "connected" in data["message"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_disconnect(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Verify disconnect works.""" response = await async_client.post("/api/v1/spoolman/disconnect") assert response.status_code == 200 assert "disconnected" in response.json()["message"].lower() # ========================================================================= # Spools Endpoint Tests # ========================================================================= @pytest.mark.asyncio @pytest.mark.integration async def test_get_spools_not_enabled(self, async_client: AsyncClient): """Verify get spools fails when not enabled.""" response = await async_client.get("/api/v1/spoolman/spools") assert response.status_code == 400 assert "not enabled" in response.json()["detail"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_get_spools_success(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Verify get spools returns data in expected format.""" mock_spool = { "id": 1, "remaining_weight": 500, "used_weight": 500, "filament": { "id": 1, "name": "PLA Basic", "material": "PLA", "color_hex": "FF0000", }, "first_used": "2024-01-01", "last_used": "2024-01-15", "location": "AMS1", "lot_nr": "LOT123", "comment": "Test spool", "extra": {"tag": '"ABC123"'}, } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool]) response = await async_client.get("/api/v1/spoolman/spools") assert response.status_code == 200 data = response.json() assert "spools" in data assert isinstance(data["spools"], list) assert len(data["spools"]) == 1 assert data["spools"][0]["id"] == 1 # ========================================================================= # Unlinked Spools Tests # ========================================================================= @pytest.mark.asyncio @pytest.mark.integration async def test_get_unlinked_spools_not_enabled(self, async_client: AsyncClient): """Verify get unlinked spools fails when not enabled.""" response = await async_client.get("/api/v1/spoolman/spools/unlinked") assert response.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_get_unlinked_spools_success( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify get unlinked spools returns spools without tags.""" # Mock spool without extra.tag (unlinked) mock_spool = { "id": 1, "remaining_weight": 800, "used_weight": 200, "extra": {}, # No tag = unlinked "filament": { "id": 1, "name": "PLA Basic", "material": "PLA", "color_hex": "FF0000", }, "location": "Shelf A", } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool]) response = await async_client.get("/api/v1/spoolman/spools/unlinked") assert response.status_code == 200 data = response.json() assert isinstance(data, list) assert len(data) == 1 assert data[0]["id"] == 1 assert data[0]["filament_name"] == "PLA Basic" @pytest.mark.asyncio @pytest.mark.integration async def test_get_unlinked_spools_excludes_linked( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify linked spools (with tag) are excluded.""" # Mock spool with extra.tag (linked) mock_spool_linked = { "id": 1, "remaining_weight": 800, "used_weight": 200, "extra": {"tag": '"ABC123"'}, # Has tag = linked "filament": {"id": 1, "name": "PLA Red", "material": "PLA", "color_hex": "FF0000"}, } # Mock spool without tag (unlinked) mock_spool_unlinked = { "id": 2, "remaining_weight": 900, "used_weight": 100, "extra": {}, # No tag = unlinked "filament": {"id": 2, "name": "PLA Blue", "material": "PLA", "color_hex": "0000FF"}, } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool_linked, mock_spool_unlinked]) response = await async_client.get("/api/v1/spoolman/spools/unlinked") assert response.status_code == 200 data = response.json() assert len(data) == 1 assert data[0]["id"] == 2 # Only unlinked spool # ========================================================================= # Linked Spools Tests # ========================================================================= @pytest.mark.asyncio @pytest.mark.integration async def test_get_linked_spools_not_enabled(self, async_client: AsyncClient): """Verify get linked spools fails when not enabled.""" response = await async_client.get("/api/v1/spoolman/spools/linked") assert response.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_get_linked_spools_success(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Verify get linked spools returns map of tag -> spool_id.""" # Mock spool with extra.tag (linked) mock_spool = { "id": 42, "remaining_weight": 800, "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}, "filament": {"id": 1, "name": "PLA Red", "material": "PLA", "weight": 1000}, } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool]) response = await async_client.get("/api/v1/spoolman/spools/linked") assert response.status_code == 200 data = response.json() assert "linked" in data assert isinstance(data["linked"], dict) # Tag should be uppercase and stripped of quotes assert "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4" in data["linked"] linked_info = data["linked"]["A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"] assert linked_info["id"] == 42 assert linked_info["remaining_weight"] == 800 assert linked_info["filament_weight"] == 1000 @pytest.mark.asyncio @pytest.mark.integration async def test_get_linked_spools_excludes_unlinked( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify unlinked spools (without tag) are excluded.""" # Mock spool with tag (linked) mock_spool_linked = { "id": 1, "extra": {"tag": '"ABC12345678901234567890123456789A"'}, "filament": {"id": 1, "name": "PLA Red", "material": "PLA"}, } # Mock spool without tag (unlinked) mock_spool_unlinked = { "id": 2, "extra": {}, "filament": {"id": 2, "name": "PLA Blue", "material": "PLA"}, } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool_linked, mock_spool_unlinked]) response = await async_client.get("/api/v1/spoolman/spools/linked") assert response.status_code == 200 data = response.json() assert len(data["linked"]) == 1 # Only linked spool @pytest.mark.asyncio @pytest.mark.integration async def test_get_linked_spools_empty_tag_excluded( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify spools with empty tag (JSON-encoded empty string) are excluded.""" # Mock spool with empty JSON-encoded tag mock_spool = { "id": 1, "extra": {"tag": '""'}, # JSON-encoded empty string "filament": {"id": 1, "name": "PLA Red", "material": "PLA"}, } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool]) response = await async_client.get("/api/v1/spoolman/spools/linked") assert response.status_code == 200 data = response.json() assert len(data["linked"]) == 0 # Empty tag should be excluded @pytest.mark.asyncio @pytest.mark.integration async def test_get_linked_spools_includes_weight_data( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify linked spools response includes remaining_weight and filament_weight.""" mock_spool = { "id": 10, "remaining_weight": 500.5, "extra": {"tag": '"AABB11223344556677889900AABBCCDD"'}, "filament": {"id": 1, "name": "PETG Blue", "material": "PETG", "weight": 750}, } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool]) response = await async_client.get("/api/v1/spoolman/spools/linked") assert response.status_code == 200 data = response.json() info = data["linked"]["AABB11223344556677889900AABBCCDD"] assert info["id"] == 10 assert info["remaining_weight"] == 500.5 assert info["filament_weight"] == 750 @pytest.mark.asyncio @pytest.mark.integration async def test_get_linked_spools_missing_weight_fields( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify linked spools handles missing weight data gracefully.""" mock_spool = { "id": 5, "extra": {"tag": '"CCDD11223344556677889900AABBCCDD"'}, "filament": {"id": 1, "name": "PLA Red", "material": "PLA"}, } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool]) response = await async_client.get("/api/v1/spoolman/spools/linked") assert response.status_code == 200 data = response.json() info = data["linked"]["CCDD11223344556677889900AABBCCDD"] assert info["id"] == 5 assert info["remaining_weight"] is None assert info["filament_weight"] is None @pytest.mark.asyncio @pytest.mark.integration async def test_get_linked_spools_null_filament( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify linked spools handles null filament object.""" mock_spool = { "id": 7, "remaining_weight": 300, "extra": {"tag": '"EEFF11223344556677889900AABBCCDD"'}, "filament": None, } mock_spoolman_client.get_spools = AsyncMock(return_value=[mock_spool]) response = await async_client.get("/api/v1/spoolman/spools/linked") assert response.status_code == 200 data = response.json() info = data["linked"]["EEFF11223344556677889900AABBCCDD"] assert info["id"] == 7 assert info["remaining_weight"] == 300 assert info["filament_weight"] is None # ========================================================================= # Link Spool Tests # ========================================================================= @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_not_enabled(self, async_client: AsyncClient): """Verify link spool fails when not enabled.""" response = await async_client.post( "/api/v1/spoolman/spools/1/link", json={"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"}, ) assert response.status_code == 400 @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_invalid_uuid_length( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify link spool fails with invalid UUID length.""" response = await async_client.post( "/api/v1/spoolman/spools/1/link", json={"tray_uuid": "ABC123"}, # Too short ) assert response.status_code == 400 assert "16 or 32 hex characters" in response.json()["detail"] @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_invalid_uuid_format( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """Verify link spool fails with non-hex UUID.""" response = await async_client.post( "/api/v1/spoolman/spools/1/link", json={"tray_uuid": "ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ"}, # Not hex ) assert response.status_code == 400 assert "hex" in response.json()["detail"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_success(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Verify successfully linking a spool — uses merge_spool_extra to preserve custom fields.""" mock_spoolman_client.merge_spool_extra = AsyncMock( return_value={"id": 1, "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}} ) response = await async_client.post( "/api/v1/spoolman/spools/1/link", json={"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"}, ) assert response.status_code == 200 data = response.json() assert data["success"] is True assert "linked" in data["message"].lower() mock_spoolman_client.merge_spool_extra.assert_called_once_with(1, {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}) @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_with_printer_context_creates_slot_assignment( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory ): """link with printer_id+ams_id+tray_id upserts into local slot-assignment table.""" mock_spoolman_client.merge_spool_extra = AsyncMock( return_value={"id": 5, "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}} ) printer = await printer_factory() response = await async_client.post( "/api/v1/spoolman/spools/5/link", json={ "tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "printer_id": printer.id, "ams_id": 0, "tray_id": 1, }, ) assert response.status_code == 200 # Verify the slot assignment row was written via the /all endpoint all_resp = await async_client.get( "/api/v1/spoolman/inventory/slot-assignments/all", params={"printer_id": printer.id}, ) assert all_resp.status_code == 200 rows = all_resp.json() assert len(rows) == 1 assert rows[0]["spoolman_spool_id"] == 5 assert rows[0]["ams_id"] == 0 assert rows[0]["tray_id"] == 1 @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_without_printer_context_no_slot_assignment( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """link without printer context calls merge_spool_extra and no slot assignment is created.""" mock_spoolman_client.merge_spool_extra = AsyncMock( return_value={"id": 5, "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}} ) response = await async_client.post( "/api/v1/spoolman/spools/5/link", json={"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"}, ) assert response.status_code == 200 mock_spoolman_client.merge_spool_extra.assert_called_once_with(5, {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}) @pytest.mark.asyncio @pytest.mark.integration async def test_unlink_spool_success(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Unlink clears extra.tag via merge_spool_extra with json-encoded empty string. Spoolman PATCHes the extra dict by MERGING with the existing keys — popping a key from a Python dict copy and PATCHing the rest doesn't clear it. To actually clear we send the JSON-encoded empty string ('""'); read-side filters strip the wrapping quotes via .strip('"') so the spool drops out of get_linked_spools. """ import json as _json mock_spoolman_client.merge_spool_extra = AsyncMock( return_value={"id": 1, "extra": {"tag": '""', "custom": "keep"}} ) response = await async_client.post("/api/v1/spoolman/spools/1/unlink") assert response.status_code == 200 data = response.json() assert data["success"] is True assert "unlinked" in data["message"].lower() mock_spoolman_client.merge_spool_extra.assert_called_once_with(1, {"tag": _json.dumps("")}) @pytest.mark.asyncio @pytest.mark.integration async def test_unlink_spool_no_deadlock(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Regression: unlink must NOT acquire extra_lock around merge_spool_extra. merge_spool_extra acquires extra_lock(spool_id) internally, so wrapping the call in another `async with client.extra_lock(spool_id)` deadlocks — asyncio.Lock is non-reentrant. Pre-fix: every unlink request hung indefinitely; the SpoolBuddy kiosk's "Unassign" button looked unresponsive because the mutation isPending stayed true forever. This test verifies the request completes promptly by mocking merge_spool_extra to fail if the lock is already held by the caller — if the caller still wraps merge_spool_extra in `client.extra_lock(...)`, merge_spool_extra would block forever waiting for the lock. """ # Real extra_lock dictionary so we can detect contention import asyncio as _asyncio real_lock = _asyncio.Lock() mock_spoolman_client.extra_lock = MagicMock(return_value=real_lock) async def fake_merge(spool_id, fields): # If the route still wraps this call in `async with extra_lock(...)`, # the lock will be held when this fires and we'll deadlock without # the timeout. The wait_for asserts we get the lock fast. await _asyncio.wait_for(real_lock.acquire(), timeout=2.0) try: return {"id": spool_id, "extra": {"tag": '""', **fields}} finally: real_lock.release() mock_spoolman_client.merge_spool_extra = AsyncMock(side_effect=fake_merge) # Cap the request at 5s to fail fast on a deadlock. response = await _asyncio.wait_for( async_client.post("/api/v1/spoolman/spools/1/unlink"), timeout=5.0, ) assert response.status_code == 200 mock_spoolman_client.merge_spool_extra.assert_called_once() @pytest.mark.asyncio @pytest.mark.integration async def test_unlink_spool_deletes_slot_assignment( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory ): """unlink removes the local slot assignment for the spool.""" # link_spool calls merge_spool_extra; unlink_spool uses get_spool + update_spool_full. mock_spoolman_client.merge_spool_extra = AsyncMock( return_value={"id": 7, "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}} ) printer = await printer_factory() # First link to create the slot assignment await async_client.post( "/api/v1/spoolman/spools/7/link", json={ "tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "printer_id": printer.id, "ams_id": 0, "tray_id": 0, }, ) # unlink_spool uses merge_spool_extra to clear the tag (Spoolman # PATCH merges, so the tag must be sent as json.dumps("") not popped). mock_spoolman_client.merge_spool_extra.reset_mock() mock_spoolman_client.merge_spool_extra = AsyncMock(return_value={"id": 7, "extra": {"tag": '""'}}) response = await async_client.post("/api/v1/spoolman/spools/7/unlink") assert response.status_code == 200 # Slot assignment must be gone all_resp = await async_client.get( "/api/v1/spoolman/inventory/slot-assignments/all", params={"printer_id": printer.id}, ) assert all_resp.status_code == 200 assert all_resp.json() == [] @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_spoolman_not_found( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """link returns 404 when Spoolman reports the spool does not exist.""" from backend.app.services.spoolman import SpoolmanNotFoundError mock_spoolman_client.merge_spool_extra = AsyncMock(side_effect=SpoolmanNotFoundError("not found")) response = await async_client.post( "/api/v1/spoolman/spools/99/link", json={"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"}, ) assert response.status_code == 404 @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_spoolman_unavailable( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """link returns 503 when Spoolman is unreachable.""" from backend.app.services.spoolman import SpoolmanUnavailableError mock_spoolman_client.merge_spool_extra = AsyncMock(side_effect=SpoolmanUnavailableError("down")) response = await async_client.post( "/api/v1/spoolman/spools/1/link", json={"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"}, ) assert response.status_code == 503 @pytest.mark.asyncio @pytest.mark.integration async def test_unlink_spool_spoolman_not_found( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """unlink returns 404 when Spoolman reports the spool does not exist. The endpoint calls merge_spool_extra directly (no longer get_spool + update_spool_full), so the not-found surface lives there. """ from backend.app.services.spoolman import SpoolmanNotFoundError mock_spoolman_client.merge_spool_extra = AsyncMock(side_effect=SpoolmanNotFoundError("not found")) response = await async_client.post("/api/v1/spoolman/spools/99/unlink") assert response.status_code == 404 # ========================================================================= # Sync Tests # ========================================================================= @pytest.mark.asyncio @pytest.mark.integration async def test_sync_printer_not_enabled(self, async_client: AsyncClient, printer_factory): """Verify sync fails when Spoolman not enabled.""" printer = await printer_factory() response = await async_client.post(f"/api/v1/spoolman/sync/{printer.id}") assert response.status_code == 400 assert "not enabled" in response.json()["detail"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_sync_printer_not_found(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Verify sync fails for non-existent printer.""" response = await async_client.post("/api/v1/spoolman/sync/9999") assert response.status_code == 404 @pytest.mark.asyncio @pytest.mark.integration async def test_sync_returns_result_structure( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory, ): """Verify sync returns proper result structure.""" printer = await printer_factory() # Mock printer manager to return AMS data with patch("backend.app.api.routes.spoolman.printer_manager") as pm_mock: mock_state = MagicMock() mock_state.raw_data = {"ams": [{"id": 0, "tray": []}]} pm_mock.get_status = MagicMock(return_value=mock_state) response = await async_client.post(f"/api/v1/spoolman/sync/{printer.id}") assert response.status_code == 200 data = response.json() # Verify SyncResult structure assert "success" in data assert "synced_count" in data assert "skipped_count" in data assert "skipped" in data assert "errors" in data assert isinstance(data["skipped"], list) assert isinstance(data["errors"], list) @pytest.mark.asyncio @pytest.mark.integration async def test_sync_printer_not_connected( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory, ): """Verify sync fails when printer is not connected (no status).""" printer = await printer_factory() with patch("backend.app.api.routes.spoolman.printer_manager") as pm_mock: pm_mock.get_status = MagicMock(return_value=None) response = await async_client.post(f"/api/v1/spoolman/sync/{printer.id}") assert response.status_code == 404 assert "not connected" in response.json()["detail"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_sync_writes_slot_assignment_to_db( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory, db_session, ): """sync persists a slot assignment row for each successfully synced spool.""" from sqlalchemy import select from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment from backend.app.services.spoolman import AMSTray printer = await printer_factory() synced_spool = {"id": 42, "filament": {"material": "PLA"}, "remaining_weight": 500} fake_tray = AMSTray( ams_id=0, tray_id=2, tray_type="PLA", tray_sub_brands="PLA Basic", tray_color="FF0000FF", remain=80, tag_uid="", tray_uuid="A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", tray_info_idx="", tray_weight=1000, ) mock_spoolman_client.parse_ams_tray = MagicMock(return_value=fake_tray) mock_spoolman_client.sync_ams_tray = AsyncMock(return_value=synced_spool) with patch("backend.app.api.routes.spoolman.printer_manager") as pm_mock: mock_state = MagicMock() mock_state.raw_data = {"ams": [{"id": 0, "tray": [{"id": 2}]}]} pm_mock.get_status = MagicMock(return_value=mock_state) response = await async_client.post(f"/api/v1/spoolman/sync/{printer.id}") assert response.status_code == 200 data = response.json() assert data["synced_count"] == 1 # Verify slot assignment was written to the DB result = await db_session.execute( select(SpoolmanSlotAssignment).where(SpoolmanSlotAssignment.printer_id == printer.id) ) rows = result.scalars().all() assert len(rows) == 1 assert rows[0].ams_id == 0 assert rows[0].tray_id == 2 assert rows[0].spoolman_spool_id == 42 @pytest.mark.asyncio @pytest.mark.integration async def test_sync_passes_slot_hint_when_no_rfid( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory, db_session, ): """sync passes the spoolman_spool_id_hint from the local slot-assignment table when no RFID tag is present.""" from sqlalchemy import text from backend.app.services.spoolman import AMSTray printer = await printer_factory() # Pre-seed a slot assignment to serve as the hint await db_session.execute( text( "INSERT INTO spoolman_slot_assignments (printer_id, ams_id, tray_id, spoolman_spool_id)" " VALUES (:p, :a, :t, :s)" ), {"p": printer.id, "a": 0, "t": 1, "s": 55}, ) await db_session.commit() captured_hints: list = [] async def capturing_sync(tray, printer_name, **kwargs): captured_hints.append(kwargs.get("spoolman_spool_id_hint")) return None fake_tray_no_rfid = AMSTray( ams_id=0, tray_id=1, tray_type="PLA", tray_sub_brands="Generic PLA", tray_color="FFFFFFFF", remain=-1, tag_uid="", tray_uuid="", tray_info_idx="", tray_weight=1000, ) mock_spoolman_client.parse_ams_tray = MagicMock(return_value=fake_tray_no_rfid) mock_spoolman_client.sync_ams_tray = capturing_sync with patch("backend.app.api.routes.spoolman.printer_manager") as pm_mock: mock_state = MagicMock() mock_state.raw_data = {"ams": [{"id": 0, "tray": [{"id": 1}]}]} pm_mock.get_status = MagicMock(return_value=mock_state) response = await async_client.post(f"/api/v1/spoolman/sync/{printer.id}") assert response.status_code == 200 assert len(captured_hints) == 1 assert captured_hints[0] == 55 @pytest.mark.asyncio @pytest.mark.integration async def test_sync_no_rfid_no_hint_produces_skipped_entry( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory, ): """sync reports a SkippedSpool for a tray with no RFID tag and no prior slot assignment.""" from backend.app.services.spoolman import AMSTray printer = await printer_factory() fake_tray = AMSTray( ams_id=0, tray_id=3, tray_type="ABS", tray_sub_brands="Generic ABS", tray_color="333333FF", remain=60, tag_uid="", tray_uuid="", tray_info_idx="", tray_weight=1000, ) mock_spoolman_client.parse_ams_tray = MagicMock(return_value=fake_tray) mock_spoolman_client.sync_ams_tray = AsyncMock(return_value=None) with patch("backend.app.api.routes.spoolman.printer_manager") as pm_mock: mock_state = MagicMock() mock_state.raw_data = {"ams": [{"id": 0, "tray": [{"id": 3}]}]} pm_mock.get_status = MagicMock(return_value=mock_state) response = await async_client.post(f"/api/v1/spoolman/sync/{printer.id}") assert response.status_code == 200 data = response.json() assert data["synced_count"] == 0 assert data["skipped_count"] == 1 assert len(data["skipped"]) == 1 skipped = data["skipped"][0] assert "No RFID" in skipped["reason"] assert skipped["filament_type"] == "ABS" # ========================================================================= # Filaments Endpoint Tests # ========================================================================= @pytest.mark.asyncio @pytest.mark.integration async def test_get_filaments_not_enabled(self, async_client: AsyncClient): """Verify get filaments fails when not enabled.""" response = await async_client.get("/api/v1/spoolman/filaments") assert response.status_code == 400 assert "not enabled" in response.json()["detail"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_get_filaments_success(self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client): """Verify get filaments returns data in expected format.""" mock_filament = { "id": 1, "name": "PLA Basic", "material": "PLA", "color_hex": "FF0000", "vendor_id": 1, "weight": 1000, } mock_spoolman_client.get_filaments = AsyncMock(return_value=[mock_filament]) response = await async_client.get("/api/v1/spoolman/filaments") assert response.status_code == 200 data = response.json() assert "filaments" in data assert isinstance(data["filaments"], list) assert len(data["filaments"]) == 1 assert data["filaments"][0]["name"] == "PLA Basic" # ========================================================================= # Disable Weight Sync Tests # ========================================================================= @pytest.fixture async def spoolman_settings_weight_sync_disabled(self, db_session): """Create Spoolman settings with weight sync disabled.""" from backend.app.models.settings import Settings enabled_setting = Settings(key="spoolman_enabled", value="true") url_setting = Settings(key="spoolman_url", value="http://localhost:7912") disable_weight_setting = Settings(key="spoolman_disable_weight_sync", value="true") partial_usage_setting = Settings(key="spoolman_report_partial_usage", value="true") db_session.add(enabled_setting) db_session.add(url_setting) db_session.add(disable_weight_setting) db_session.add(partial_usage_setting) await db_session.commit() return { "enabled": enabled_setting, "url": url_setting, "disable_weight": disable_weight_setting, "partial_usage": partial_usage_setting, } @pytest.mark.asyncio @pytest.mark.integration async def test_settings_returns_disable_weight_sync( self, async_client: AsyncClient, spoolman_settings_weight_sync_disabled ): """Verify settings endpoint returns the disable_weight_sync setting.""" response = await async_client.get("/api/v1/settings/spoolman") assert response.status_code == 200 data = response.json() assert "spoolman_disable_weight_sync" in data assert data["spoolman_disable_weight_sync"] == "true" @pytest.mark.asyncio @pytest.mark.integration async def test_settings_update_disable_weight_sync(self, async_client: AsyncClient, spoolman_settings): """Verify settings endpoint can update the disable_weight_sync setting.""" # First verify it's false by default response = await async_client.get("/api/v1/settings/spoolman") assert response.status_code == 200 data = response.json() assert data.get("spoolman_disable_weight_sync", "false") == "false" # Update the setting response = await async_client.put( "/api/v1/settings/spoolman", json={"spoolman_disable_weight_sync": "true"}, ) assert response.status_code == 200 data = response.json() assert data["spoolman_disable_weight_sync"] == "true" # Verify it persisted response = await async_client.get("/api/v1/settings/spoolman") assert response.status_code == 200 data = response.json() assert data["spoolman_disable_weight_sync"] == "true" @pytest.mark.asyncio @pytest.mark.integration async def test_sync_with_weight_sync_disabled_passes_flag( self, async_client: AsyncClient, spoolman_settings_weight_sync_disabled, mock_spoolman_client, printer_factory, ): """Verify sync passes disable_weight_sync=True to sync_ams_tray when the setting is on.""" printer = await printer_factory() # Mock existing spool mock_existing_spool = { "id": 42, "remaining_weight": 800, "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}, "filament": {"id": 1, "name": "PLA Red", "material": "PLA"}, } mock_spoolman_client.find_spool_by_tag = AsyncMock(return_value=mock_existing_spool) mock_spoolman_client.parse_ams_tray = MagicMock() # Create mock AMSTray from backend.app.services.spoolman import AMSTray mock_tray = AMSTray( ams_id=0, tray_id=0, tray_type="PLA", tray_sub_brands="PLA Basic", tray_color="FF0000FF", remain=50, tag_uid="", tray_uuid="A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", tray_info_idx="GFA00", tray_weight=1000, ) mock_spoolman_client.parse_ams_tray.return_value = mock_tray mock_spoolman_client.convert_ams_slot_to_location = MagicMock(return_value="AMS A1") mock_spoolman_client.sync_ams_tray = AsyncMock(return_value={"id": 42}) mock_spoolman_client.clear_location_for_removed_spools = AsyncMock(return_value=0) with patch("backend.app.api.routes.spoolman.printer_manager") as pm_mock: mock_state = MagicMock() mock_state.raw_data = { "ams": [ { "id": 0, "tray": [ { "id": 0, "tray_type": "PLA", "tray_sub_brands": "PLA Basic", "tray_color": "FF0000FF", "remain": 50, "tag_uid": "", "tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "tray_info_idx": "GFA00", "tray_weight": 1000, } ], } ] } pm_mock.get_status = MagicMock(return_value=mock_state) response = await async_client.post(f"/api/v1/spoolman/sync/{printer.id}") assert response.status_code == 200 # Verify sync_ams_tray was called with disable_weight_sync=True mock_spoolman_client.sync_ams_tray.assert_called() call_kwargs = mock_spoolman_client.sync_ams_tray.call_args.kwargs assert call_kwargs.get("disable_weight_sync") is True # ========================================================================= # Report Partial Usage Tests # ========================================================================= @pytest.fixture async def spoolman_settings_partial_usage_disabled(self, db_session): """Create Spoolman settings with partial usage reporting disabled.""" from backend.app.models.settings import Settings enabled_setting = Settings(key="spoolman_enabled", value="true") url_setting = Settings(key="spoolman_url", value="http://localhost:7912") partial_usage_setting = Settings(key="spoolman_report_partial_usage", value="false") db_session.add(enabled_setting) db_session.add(url_setting) db_session.add(partial_usage_setting) await db_session.commit() return { "enabled": enabled_setting, "url": url_setting, "partial_usage": partial_usage_setting, } @pytest.mark.asyncio @pytest.mark.integration async def test_settings_returns_report_partial_usage( self, async_client: AsyncClient, spoolman_settings_partial_usage_disabled ): """Verify settings endpoint returns the report_partial_usage setting.""" response = await async_client.get("/api/v1/settings/spoolman") assert response.status_code == 200 data = response.json() assert "spoolman_report_partial_usage" in data assert data["spoolman_report_partial_usage"] == "false" @pytest.mark.asyncio @pytest.mark.integration async def test_settings_update_report_partial_usage(self, async_client: AsyncClient, spoolman_settings): """Verify settings endpoint can update the report_partial_usage setting.""" # First verify it's true by default response = await async_client.get("/api/v1/settings/spoolman") assert response.status_code == 200 data = response.json() assert data.get("spoolman_report_partial_usage", "true") == "true" # Update the setting to false response = await async_client.put( "/api/v1/settings/spoolman", json={"spoolman_report_partial_usage": "false"}, ) assert response.status_code == 200 data = response.json() assert data["spoolman_report_partial_usage"] == "false" # Verify it persisted response = await async_client.get("/api/v1/settings/spoolman") assert response.status_code == 200 data = response.json() assert data["spoolman_report_partial_usage"] == "false" @pytest.mark.asyncio @pytest.mark.integration async def test_settings_report_partial_usage_defaults_to_true(self, async_client: AsyncClient, spoolman_settings): """Verify report_partial_usage defaults to true (unlike disable_weight_sync which defaults to false).""" response = await async_client.get("/api/v1/settings/spoolman") assert response.status_code == 200 data = response.json() # Should default to "true" assert data["spoolman_report_partial_usage"] == "true" class TestLinkSpoolMqttConfigure: """P9-TEST-BE (Bug #8): link_spool sends MQTT configure when printer context is provided.""" @pytest.fixture async def spoolman_settings(self, db_session): from backend.app.models.settings import Settings db_session.add(Settings(key="spoolman_enabled", value="true")) db_session.add(Settings(key="spoolman_url", value="http://localhost:7912")) await db_session.commit() @pytest.fixture def mock_spoolman_client(self): mock_client = MagicMock() mock_client.is_connected = True mock_client.base_url = "http://localhost:7912" mock_client.health_check = AsyncMock(return_value=True) mock_client.ensure_tag_extra_field = AsyncMock(return_value=True) mock_client.merge_spool_extra = AsyncMock( return_value={"id": 5, "extra": {"tag": '"A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"'}} ) # #1457 stale-tag cleanup enumerates spools; default to empty so it's a no-op. mock_client.get_spools = AsyncMock(return_value=[]) mock_client.get_spool = AsyncMock( return_value={ "id": 5, "remaining_weight": 800.0, "used_weight": 200.0, "spool_weight": None, "filament": { "id": 1, "name": "PLA Basic", "material": "PLA", "color_hex": "FF0000", "color_name": "Red", "vendor": {"id": 1, "name": "Bambu Lab"}, "weight": 1000, "spool_weight": 250, }, "extra": {}, "location": None, "comment": None, "archived": False, } ) mock_client.close = AsyncMock() mock_client.extra_lock = MagicMock() mock_client.extra_lock.return_value.__aenter__ = AsyncMock(return_value=None) mock_client.extra_lock.return_value.__aexit__ = AsyncMock(return_value=False) with ( patch( "backend.app.api.routes.spoolman.get_spoolman_client", AsyncMock(return_value=mock_client), ), patch( "backend.app.api.routes.spoolman.init_spoolman_client", AsyncMock(return_value=mock_client), ), patch( "backend.app.api.routes.spoolman.close_spoolman_client", AsyncMock(), ), ): yield mock_client @pytest.fixture def printer_factory(self, db_session): _counter = [0] async def _create(**kwargs): from backend.app.models.printer import Printer _counter[0] += 1 defaults = { "name": f"Test Printer {_counter[0]}", "serial_number": f"MQTTTEST{_counter[0]:06d}", "ip_address": f"192.168.100.{_counter[0]}", "access_code": "12345678", "is_active": True, "auto_archive": True, "model": "X1C", } defaults.update(kwargs) p = Printer(**defaults) db_session.add(p) await db_session.commit() await db_session.refresh(p) return p return _create @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_sends_ams_set_filament_with_printer_context( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory ): """link_spool with printer context calls ams_set_filament_setting via MQTT.""" printer = await printer_factory() mqtt_mock = MagicMock() mqtt_mock.printer_state = MagicMock( nozzles=[MagicMock(nozzle_diameter="0.4")], ams_extruder_map={"0": 0}, raw_data={"ams": [{"id": 0, "tray": [{"id": 1, "cali_idx": None}]}]}, ) with patch("backend.app.api.routes.spoolman.printer_manager") as mock_pm: mock_pm.get_client.return_value = mqtt_mock response = await async_client.post( "/api/v1/spoolman/spools/5/link", json={ "tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "printer_id": printer.id, "ams_id": 0, "tray_id": 1, }, ) assert response.status_code == 200 assert response.json()["success"] is True mqtt_mock.ams_set_filament_setting.assert_called_once() call_kwargs = mqtt_mock.ams_set_filament_setting.call_args.kwargs assert call_kwargs["ams_id"] == 0 assert call_kwargs["tray_id"] == 1 @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_no_printer_context_no_mqtt( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client ): """link_spool without printer context does not attempt MQTT configure.""" mqtt_mock = MagicMock() with patch("backend.app.api.routes.spoolman.printer_manager") as mock_pm: mock_pm.get_client.return_value = mqtt_mock response = await async_client.post( "/api/v1/spoolman/spools/5/link", json={"tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4"}, ) assert response.status_code == 200 mock_pm.get_client.assert_not_called() mqtt_mock.ams_set_filament_setting.assert_not_called() @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_mqtt_failure_does_not_prevent_link( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory ): """MQTT failure is best-effort: link still succeeds if ams_set_filament_setting throws.""" printer = await printer_factory() mqtt_mock = MagicMock() mqtt_mock.printer_state = MagicMock( nozzles=[MagicMock(nozzle_diameter="0.4")], ams_extruder_map={"0": 0}, raw_data={"ams": []}, ) mqtt_mock.ams_set_filament_setting.side_effect = RuntimeError("MQTT connection lost") with patch("backend.app.api.routes.spoolman.printer_manager") as mock_pm: mock_pm.get_client.return_value = mqtt_mock response = await async_client.post( "/api/v1/spoolman/spools/5/link", json={ "tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "printer_id": printer.id, "ams_id": 0, "tray_id": 0, }, ) assert response.status_code == 200 data = response.json() assert data["success"] is True assert "linked" in data["message"].lower() @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_realigns_filament_context_to_printer_kp( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory, db_session, ): """link_spool MQTT auto-configure realigns tray_info_idx + setting_id to the printer-side kp's filament context — same fix as assign path. Pre-fix link_spool used `mqtt_client.printer_state` (a non-existent attribute that always evaluated to None), so state.kprofiles / nozzles / ams_extruder_map were all skipped — every link sent generic-PLA tray_info_idx with empty setting_id, and the cali_idx in the printer's table couldn't be linked. (#1114) """ from backend.app.models.spoolman_k_profile import SpoolmanKProfile printer = await printer_factory() kp = SpoolmanKProfile( spoolman_spool_id=5, printer_id=printer.id, extruder=0, nozzle_diameter="0.4", k_value=0.025, cali_idx=8948, setting_id="PFUSedbf16b803ff3e", ) db_session.add(kp) await db_session.commit() printer_kp = MagicMock( slot_id=8948, nozzle_diameter="0.4", filament_id="P4d64437", setting_id="PFUSedbf16b803ff3e", ) printer_state = MagicMock( nozzles=[MagicMock(nozzle_diameter="0.4")], ams_extruder_map={"0": 0}, raw_data={"ams": [{"id": 0, "tray": [{"id": 1, "cali_idx": None}]}]}, kprofiles=[printer_kp], ) mqtt_mock = MagicMock() mqtt_mock.ams_set_filament_setting = MagicMock() mqtt_mock.extrusion_cali_sel = MagicMock() # Production never had this attribute; pre-fix code read it and got # None, defeating the cascade. The new code uses get_status instead. mqtt_mock.printer_state = None with patch("backend.app.api.routes.spoolman.printer_manager") as mock_pm: mock_pm.get_client.return_value = mqtt_mock mock_pm.get_status.return_value = printer_state response = await async_client.post( "/api/v1/spoolman/spools/5/link", json={ "tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "printer_id": printer.id, "ams_id": 0, "tray_id": 1, }, ) assert response.status_code == 200 amf_kwargs = mqtt_mock.ams_set_filament_setting.call_args.kwargs assert amf_kwargs["tray_info_idx"] == "P4d64437" assert amf_kwargs["setting_id"] == "PFUSedbf16b803ff3e" cs_kwargs = mqtt_mock.extrusion_cali_sel.call_args.kwargs assert cs_kwargs["cali_idx"] == 8948 assert cs_kwargs["filament_id"] == "P4d64437" @pytest.mark.asyncio @pytest.mark.integration async def test_link_spool_uses_printer_manager_not_mqtt_client_state( self, async_client: AsyncClient, spoolman_settings, mock_spoolman_client, printer_factory, db_session, ): """Regression: state comes from printer_manager.get_status, not mqtt_client.printer_state (which didn't exist on the real client).""" from backend.app.models.spoolman_k_profile import SpoolmanKProfile printer = await printer_factory() kp = SpoolmanKProfile( spoolman_spool_id=5, printer_id=printer.id, extruder=0, nozzle_diameter="0.4", k_value=0.025, cali_idx=42, setting_id="GFSL05", ) db_session.add(kp) await db_session.commit() printer_state = MagicMock( nozzles=[MagicMock(nozzle_diameter="0.4")], ams_extruder_map={"0": 0}, raw_data=None, kprofiles=[], ) mqtt_mock = MagicMock() mqtt_mock.ams_set_filament_setting = MagicMock() mqtt_mock.extrusion_cali_sel = MagicMock() # Production didn't have mqtt_client.printer_state — drop the spec # so an accidental read raises AttributeError instead of silently # returning a MagicMock. del mqtt_mock.printer_state with patch("backend.app.api.routes.spoolman.printer_manager") as mock_pm: mock_pm.get_client.return_value = mqtt_mock mock_pm.get_status.return_value = printer_state response = await async_client.post( "/api/v1/spoolman/spools/5/link", json={ "tray_uuid": "A1B2C3D4E5F6A1B2C3D4E5F6A1B2C3D4", "printer_id": printer.id, "ams_id": 0, "tray_id": 2, }, ) assert response.status_code == 200 # cali_sel must fire with cali_idx=42 — proves get_status was used mqtt_mock.extrusion_cali_sel.assert_called_once() assert mqtt_mock.extrusion_cali_sel.call_args.kwargs["cali_idx"] == 42