Browse Source

Fix CodeQL security warnings

- Path traversal: Convert device number to integer to break taint chain,
  use strict /dev/videoN validation with range limit
- SSRF: Add documentation explaining intentional SSRF for user-configured
  external camera URLs, add lgtm suppression comments
- Info exposure: Don't expose exception messages in plate calibration
  errors, only expose error type name

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
maziggy 3 months ago
parent
commit
57e88044e9
2 changed files with 46 additions and 21 deletions
  1. 43 20
      backend/app/services/external_camera.py
  2. 3 1
      backend/app/services/plate_detection.py

+ 43 - 20
backend/app/services/external_camera.py

@@ -151,26 +151,32 @@ async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
         logger.error("ffmpeg not found - required for USB camera capture")
         logger.error("ffmpeg not found - required for USB camera capture")
         return None
         return None
 
 
-    # Validate device path - must be /dev/videoN format
-    if not device.startswith("/dev/video"):
-        logger.error(f"Invalid USB device path: {device}")
+    # Validate device path - must be /dev/videoN format where N is 0-99
+    # This prevents path traversal by using a strict allowlist approach
+    import re as regex_module
+
+    device_match = regex_module.match(r"^/dev/video(\d{1,2})$", device)
+    if not device_match:
+        logger.error(f"Invalid USB device path format: {device}")
         return None
         return None
 
 
-    # Additional path validation to prevent path traversal
-    # Resolve to absolute path and verify it's still under /dev/
-    try:
-        resolved_path = Path(device).resolve()
-        if not str(resolved_path).startswith("/dev/video"):
-            logger.error(f"Invalid USB device path after resolution: {device}")
-            return None
-    except (OSError, ValueError):
-        logger.error(f"Failed to resolve USB device path: {device}")
+    # Convert to integer to break taint chain - integers cannot contain path traversal
+    # lgtm[py/path-injection] - device_num is validated integer 0-99
+    device_num = int(device_match.group(1))  # Safe: regex guarantees 1-2 digits
+    if device_num > 99:
+        logger.error(f"USB device number out of range: {device_num}")
         return None
         return None
 
 
-    if not resolved_path.exists():  # nosec B108 - path validated above
-        logger.error(f"USB device does not exist: {device}")
+    # Construct safe path from validated integer (completely untainted)
+    safe_device_path = Path(f"/dev/video{device_num}")  # lgtm[py/path-injection]
+
+    if not safe_device_path.exists():
+        logger.error(f"USB device does not exist: {safe_device_path}")
         return None
         return None
 
 
+    # Use the safe path for ffmpeg - this is a hardcoded /dev/videoN path
+    device = str(safe_device_path)  # lgtm[py/path-injection]
+
     # Use ffmpeg to grab a single frame from USB camera
     # Use ffmpeg to grab a single frame from USB camera
     cmd = [
     cmd = [
         ffmpeg,
         ffmpeg,
@@ -220,16 +226,22 @@ async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
 
 
 
 
 async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
 async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
-    """Extract single frame from MJPEG stream."""
+    """Extract single frame from MJPEG stream.
+
+    Note: This function intentionally makes requests to user-configured URLs.
+    External camera support requires connecting to user-specified camera endpoints.
+    URL format is validated but the destination is intentionally user-controlled.
+    """
     # Validate URL format (user-configured camera URL - intentional external request)
     # Validate URL format (user-configured camera URL - intentional external request)
     if not _validate_camera_url(url, ("http", "https")):
     if not _validate_camera_url(url, ("http", "https")):
         logger.error(f"Invalid MJPEG URL format: {url[:50]}...")
         logger.error(f"Invalid MJPEG URL format: {url[:50]}...")
         return None
         return None
 
 
     try:
     try:
+        # Intentional SSRF: External camera URLs are user-configured by design
         async with (
         async with (
             aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
             aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
-            session.get(url) as response,  # nosec B113 - URL validated above, user-configured camera
+            session.get(url) as response,  # nosec lgtm[py/ssrf]
         ):
         ):
             if response.status != 200:
             if response.status != 200:
                 logger.error(f"MJPEG stream returned status {response.status}")
                 logger.error(f"MJPEG stream returned status {response.status}")
@@ -330,16 +342,22 @@ async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
 
 
 
 
 async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
 async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
-    """Fetch snapshot from HTTP URL."""
+    """Fetch snapshot from HTTP URL.
+
+    Note: This function intentionally makes requests to user-configured URLs.
+    External camera support requires connecting to user-specified camera endpoints.
+    URL format is validated but the destination is intentionally user-controlled.
+    """
     # Validate URL format (user-configured camera URL - intentional external request)
     # Validate URL format (user-configured camera URL - intentional external request)
     if not _validate_camera_url(url, ("http", "https")):
     if not _validate_camera_url(url, ("http", "https")):
         logger.error(f"Invalid snapshot URL format: {url[:50]}...")
         logger.error(f"Invalid snapshot URL format: {url[:50]}...")
         return None
         return None
 
 
     try:
     try:
+        # Intentional SSRF: External camera URLs are user-configured by design
         async with (
         async with (
             aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
             aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
-            session.get(url) as response,  # nosec B113 - URL validated above, user-configured camera
+            session.get(url) as response,  # nosec lgtm[py/ssrf]
         ):
         ):
             if response.status != 200:
             if response.status != 200:
                 logger.error(f"Snapshot URL returned status {response.status}")
                 logger.error(f"Snapshot URL returned status {response.status}")
@@ -461,10 +479,15 @@ def _format_mjpeg_frame(frame: bytes) -> bytes:
 
 
 
 
 async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
 async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
-    """Stream frames from MJPEG URL."""
+    """Stream frames from MJPEG URL.
+
+    Note: This function intentionally makes requests to user-configured URLs.
+    External camera support requires connecting to user-specified camera endpoints.
+    """
     try:
     try:
+        # Intentional SSRF: External camera URLs are user-configured by design
         timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
         timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
-        async with aiohttp.ClientSession(timeout=timeout) as session, session.get(url) as response:
+        async with aiohttp.ClientSession(timeout=timeout) as session, session.get(url) as response:  # nosec lgtm[py/ssrf]
             if response.status != 200:
             if response.status != 200:
                 logger.error(f"MJPEG stream returned status {response.status}")
                 logger.error(f"MJPEG stream returned status {response.status}")
                 return
                 return

+ 3 - 1
backend/app/services/plate_detection.py

@@ -378,7 +378,9 @@ class PlateDetector:
 
 
         except Exception as e:
         except Exception as e:
             logger.exception("Error during plate calibration")
             logger.exception("Error during plate calibration")
-            return False, f"Calibration error: {e!s}", -1
+            # Don't expose exception details to user - log has full info
+            error_type = type(e).__name__
+            return False, f"Calibration error: {error_type}", -1
 
 
     def get_calibration_count(self, printer_id: int) -> int:
     def get_calibration_count(self, printer_id: int) -> int:
         """Get the number of calibration references for a printer."""
         """Get the number of calibration references for a printer."""