Browse Source

Add retry logic and connection resilience to Spoolman sync

Implements retry mechanism to handle intermittent network errors when
fetching spools cache for AMS sync operations.

Changes:
- Add retry logic to get_spools() with 3 attempts and 500ms delay
- Configure httpx client with connection pool limits to prevent stale
  connection reuse (max_keepalive_connections=5, keepalive_expiry=30s)
- Recreate client on connection errors (ReadError, RemoteProtocolError)
- Abort sync operations if cache fetch fails after all retries
- Update on_ams_change, sync_single_printer, and sync_all_printers to
  handle cache fetch failures gracefully

This addresses ReadError(ClosedResourceError()) failures that occurred
intermittently when Spoolman closed idle connections or connection
pooling reused stale connections.

Testing:
- Added 4 new unit tests for retry behavior
- All 1018 tests passing
bambuman 3 months ago
parent
commit
c04df8ceb9

+ 18 - 4
backend/app/api/routes/spoolman.py

@@ -220,8 +220,15 @@ async def sync_printer_ams(
     # OPTIMIZATION: Fetch all spools once before processing trays
     # This eliminates redundant API calls (one per tray) when syncing multiple trays
     logger.debug("[Printer %s] Fetching spools cache for sync...", printer.name)
-    cached_spools = await client.get_spools()
-    logger.debug("[Printer %s] Cached %d spools for batch sync", printer.name, len(cached_spools))
+    try:
+        cached_spools = await client.get_spools()
+        logger.debug("[Printer %s] Cached %d spools for batch sync", printer.name, len(cached_spools))
+    except Exception as e:
+        logger.error("[Printer %s] Failed to fetch spools cache after retries: %s", printer.name, e)
+        raise HTTPException(
+            status_code=503,
+            detail=f"Failed to connect to Spoolman after multiple retries: {str(e)}",
+        )
 
     for ams_unit in ams_units:
         if not isinstance(ams_unit, dict):
@@ -342,8 +349,15 @@ async def sync_all_printers(
     # OPTIMIZATION: Fetch all spools once before processing ALL printers/trays
     # This eliminates redundant API calls across all printers
     logger.debug("Fetching spools cache for sync-all operation...")
-    cached_spools = await client.get_spools()
-    logger.debug("Cached %d spools for batch sync across %d printers", len(cached_spools), len(printers))
+    try:
+        cached_spools = await client.get_spools()
+        logger.debug("Cached %d spools for batch sync across %d printers", len(cached_spools), len(printers))
+    except Exception as e:
+        logger.error("Failed to fetch spools cache after retries: %s", e)
+        raise HTTPException(
+            status_code=503,
+            detail=f"Failed to connect to Spoolman after multiple retries: {str(e)}",
+        )
 
     for printer in printers:
         state = printer_manager.get_status(printer.id)

+ 10 - 2
backend/app/main.py

@@ -560,8 +560,16 @@ async def on_ams_change(printer_id: int, ams_data: list):
             # OPTIMIZATION: Fetch all spools once before processing trays
             # This eliminates redundant API calls (one per tray) when syncing multiple trays
             logger.debug("[Printer %s] Fetching spools cache for AMS sync...", printer_id)
-            cached_spools = await client.get_spools()
-            logger.debug("[Printer %s] Cached %d spools for batch sync", printer_id, len(cached_spools))
+            try:
+                cached_spools = await client.get_spools()
+                logger.debug("[Printer %s] Cached %d spools for batch sync", printer_id, len(cached_spools))
+            except Exception as e:
+                logger.error(
+                    "[Printer %s] Failed to fetch spools cache after retries, aborting AMS sync: %s",
+                    printer_id,
+                    e,
+                )
+                return
 
             # Sync each AMS tray
             synced = 0

+ 65 - 11
backend/app/services/spoolman.py

@@ -1,5 +1,6 @@
 """Spoolman integration service for syncing AMS filament data."""
 
+import asyncio
 import logging
 from dataclasses import dataclass
 from datetime import datetime, timezone
@@ -68,9 +69,22 @@ class SpoolmanClient:
         self._connected = False
 
     async def _get_client(self) -> httpx.AsyncClient:
-        """Get or create the HTTP client."""
+        """Get or create the HTTP client with connection pooling limits.
+
+        Configures the client to prevent idle connection issues:
+        - max_keepalive_connections=5: Limit number of persistent connections
+        - keepalive_expiry=30: Close idle connections after 30 seconds
+        - max_connections=10: Limit total connections to prevent resource exhaustion
+        """
         if self._client is None:
-            self._client = httpx.AsyncClient(timeout=10.0)
+            self._client = httpx.AsyncClient(
+                timeout=10.0,
+                limits=httpx.Limits(
+                    max_keepalive_connections=5,
+                    max_connections=10,
+                    keepalive_expiry=30.0,
+                ),
+            )
         return self._client
 
     async def close(self):
@@ -101,19 +115,59 @@ class SpoolmanClient:
         return self._connected
 
     async def get_spools(self) -> list[dict]:
-        """Get all spools from Spoolman.
+        """Get all spools from Spoolman with retry logic.
+
+        Attempts to fetch spools up to 3 times with 500ms delay between attempts.
+        This handles transient network errors like closed connections.
 
         Returns:
             List of spool dictionaries.
+
+        Raises:
+            Exception: If all 3 retry attempts fail.
         """
-        try:
-            client = await self._get_client()
-            response = await client.get(f"{self.api_url}/spool")
-            response.raise_for_status()
-            return response.json()
-        except Exception as e:
-            logger.error("Failed to get spools from Spoolman: %s", e)
-            return []
+        max_attempts = 3
+        retry_delay = 0.5  # 500ms
+
+        for attempt in range(1, max_attempts + 1):
+            try:
+                client = await self._get_client()
+                response = await client.get(f"{self.api_url}/spool")
+                response.raise_for_status()
+                spools = response.json()
+                if attempt > 1:
+                    logger.info("Successfully fetched %d spools on attempt %d", len(spools), attempt)
+                return spools
+            except (httpx.ReadError, httpx.RemoteProtocolError, httpx.ConnectError) as e:
+                # Connection-related errors - close and recreate client for next attempt
+                if attempt < max_attempts:
+                    logger.warning(
+                        "Connection error getting spools (attempt %d/%d): %s. Recreating client and retrying in %dms...",
+                        attempt,
+                        max_attempts,
+                        e,
+                        int(retry_delay * 1000),
+                    )
+                    # Close the stale client and recreate it
+                    await self.close()
+                    await asyncio.sleep(retry_delay)
+                else:
+                    logger.error("Failed to get spools from Spoolman after %d attempts: %s", max_attempts, e)
+                    raise
+            except Exception as e:
+                # Other errors (HTTP errors, JSON decode errors, etc.)
+                if attempt < max_attempts:
+                    logger.warning(
+                        "Failed to get spools from Spoolman (attempt %d/%d): %s. Retrying in %dms...",
+                        attempt,
+                        max_attempts,
+                        e,
+                        int(retry_delay * 1000),
+                    )
+                    await asyncio.sleep(retry_delay)
+                else:
+                    logger.error("Failed to get spools from Spoolman after %d attempts: %s", max_attempts, e)
+                    raise
 
     async def get_filaments(self) -> list[dict]:
         """Get all internal filaments from Spoolman.

+ 125 - 1
backend/tests/unit/services/test_spoolman_service.py

@@ -4,7 +4,7 @@ These tests specifically target the sync_ams_tray method's disable_weight_sync
 functionality that controls whether remaining_weight is updated.
 """
 
-from unittest.mock import AsyncMock, patch
+from unittest.mock import AsyncMock, Mock, patch
 
 import pytest
 
@@ -250,3 +250,127 @@ class TestSpoolmanClient:
             call_kwargs = mock_update.call_args.kwargs
             assert call_kwargs["spool_id"] == 3
             assert call_kwargs.get("clear_location") is True
+
+    # ========================================================================
+    # Tests for retry logic in get_spools
+    # ========================================================================
+
+    @pytest.mark.asyncio
+    async def test_get_spools_succeeds_on_first_attempt(self, client):
+        """Verify get_spools succeeds immediately when no errors occur."""
+        mock_spools = [{"id": 1}, {"id": 2}]
+
+        with patch.object(client, "_get_client") as mock_get_client:
+            mock_http_client = AsyncMock()
+            mock_response = Mock()
+            mock_response.raise_for_status = Mock()
+            mock_response.json = Mock(return_value=mock_spools)
+            mock_http_client.get = AsyncMock(return_value=mock_response)
+            mock_get_client.return_value = mock_http_client
+
+            result = await client.get_spools()
+
+            assert result == mock_spools
+            mock_get_client.assert_called_once()
+            mock_http_client.get.assert_called_once()
+
+    @pytest.mark.asyncio
+    async def test_get_spools_retries_on_connection_error(self, client):
+        """Verify get_spools retries up to 3 times on connection errors."""
+        import httpx
+
+        mock_spools = [{"id": 1}]
+
+        with (
+            patch.object(client, "_get_client") as mock_get_client,
+            patch.object(client, "close", AsyncMock()) as mock_close,
+            patch("asyncio.sleep", AsyncMock()) as mock_sleep,
+        ):
+            mock_http_client = AsyncMock()
+            mock_get_client.return_value = mock_http_client
+
+            # First 2 attempts fail with ReadError, 3rd succeeds
+            mock_response = Mock()
+            mock_response.raise_for_status = Mock()
+            mock_response.json = Mock(return_value=mock_spools)
+
+            mock_http_client.get = AsyncMock(
+                side_effect=[
+                    httpx.ReadError("Connection closed"),
+                    httpx.ReadError("Connection closed"),
+                    mock_response,
+                ]
+            )
+
+            result = await client.get_spools()
+
+            assert result == mock_spools
+            assert mock_get_client.call_count == 3
+            assert mock_http_client.get.call_count == 3
+            # Should close client twice (after each failed attempt)
+            assert mock_close.call_count == 2
+            # Should sleep twice (after first 2 attempts)
+            assert mock_sleep.call_count == 2
+            mock_sleep.assert_called_with(0.5)
+
+    @pytest.mark.asyncio
+    async def test_get_spools_raises_after_3_failed_attempts(self, client):
+        """Verify get_spools raises exception after 3 failed attempts."""
+        import httpx
+
+        with (
+            patch.object(client, "_get_client", AsyncMock()) as mock_get_client,
+            patch.object(client, "close", AsyncMock()) as mock_close,
+            patch("asyncio.sleep", AsyncMock()) as mock_sleep,
+        ):
+            mock_http_client = AsyncMock()
+            mock_get_client.return_value = mock_http_client
+
+            # All 3 attempts fail
+            mock_http_client.get.side_effect = httpx.ReadError("Connection closed")
+
+            with pytest.raises(httpx.ReadError):
+                await client.get_spools()
+
+            assert mock_get_client.call_count == 3
+            assert mock_http_client.get.call_count == 3
+            # Should close client twice (after first 2 failed attempts, not after 3rd)
+            assert mock_close.call_count == 2
+            # Should sleep twice (after first 2 attempts, not after 3rd)
+            assert mock_sleep.call_count == 2
+
+    @pytest.mark.asyncio
+    async def test_get_spools_handles_non_connection_errors(self, client):
+        """Verify get_spools retries on non-connection errors without recreating client."""
+        import httpx
+
+        mock_spools = [{"id": 1}]
+
+        with (
+            patch.object(client, "_get_client") as mock_get_client,
+            patch.object(client, "close", AsyncMock()) as mock_close,
+            patch("asyncio.sleep", AsyncMock()) as mock_sleep,
+        ):
+            mock_http_client = AsyncMock()
+            mock_get_client.return_value = mock_http_client
+
+            # First attempt fails with HTTP error, 2nd succeeds
+            mock_response_error = Mock()
+            mock_response_error.raise_for_status = Mock(
+                side_effect=httpx.HTTPStatusError("500 Server Error", request=Mock(), response=Mock())
+            )
+
+            mock_response_success = Mock()
+            mock_response_success.raise_for_status = Mock()
+            mock_response_success.json = Mock(return_value=mock_spools)
+
+            mock_http_client.get = AsyncMock(side_effect=[mock_response_error, mock_response_success])
+
+            result = await client.get_spools()
+
+            assert result == mock_spools
+            assert mock_get_client.call_count == 2
+            # Should NOT close client for HTTP errors (only connection errors)
+            mock_close.assert_not_called()
+            # Should sleep once (after first failed attempt)
+            assert mock_sleep.call_count == 1