| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911 |
- """External camera service.
- Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
- Security Note: This service intentionally makes requests to user-configured camera URLs.
- This is necessary functionality for external camera integration. URLs are validated
- to ensure they are well-formed before use.
- """
- import asyncio
- import logging
- import re
- import shutil
- from collections.abc import AsyncGenerator
- from pathlib import Path
- from urllib.parse import urlparse
- import aiohttp
- logger = logging.getLogger(__name__)
- def _sanitize_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> str | None:
- """Validate and sanitize camera URL, returning a safe reconstructed URL.
- This validates that the URL is well-formed, uses an allowed scheme,
- does not target cloud metadata services, and returns a reconstructed
- URL from validated components.
- Note: This intentionally allows user-provided URLs as that is the
- purpose of external camera configuration. Local network IPs are
- allowed since cameras are typically on the same LAN.
- Args:
- url: URL to validate and sanitize
- allowed_schemes: Tuple of allowed URL schemes
- Returns:
- Sanitized URL string if valid, None otherwise
- """
- try:
- parsed = urlparse(url)
- if not parsed.scheme or not parsed.netloc:
- return None
- # Validate scheme against allowlist
- scheme = parsed.scheme.lower()
- if scheme not in allowed_schemes:
- return None
- # Block cloud metadata service endpoints (SSRF mitigation)
- # These are dangerous destinations that should never be accessed
- hostname = parsed.hostname or ""
- hostname_lower = hostname.lower()
- blocked_hosts = (
- "169.254.169.254", # AWS/GCP/Azure metadata
- "metadata.google.internal", # GCP metadata
- "metadata.google",
- "localhost", # Block localhost to prevent internal service access
- "127.0.0.1",
- "::1",
- "0.0.0.0", # nosec B104
- )
- if hostname_lower in blocked_hosts:
- logger.warning("Blocked camera URL targeting restricted host: %s", hostname)
- return None
- # Block link-local addresses (169.254.x.x)
- if hostname.startswith("169.254."):
- logger.warning("Blocked camera URL targeting link-local address: %s", hostname)
- return None
- # Reconstruct URL from validated components to break taint chain
- # This creates a new string from validated parts
- port_str = f":{parsed.port}" if parsed.port else ""
- path = parsed.path or ""
- query = f"?{parsed.query}" if parsed.query else ""
- fragment = f"#{parsed.fragment}" if parsed.fragment else ""
- # Build sanitized URL from validated components
- sanitized = f"{scheme}://{hostname}{port_str}{path}{query}{fragment}"
- return sanitized
- except ValueError:
- return None
- def _validate_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> bool:
- """Validate camera URL format (legacy wrapper).
- Args:
- url: URL to validate
- allowed_schemes: Tuple of allowed URL schemes
- Returns:
- True if URL is valid, False otherwise
- """
- return _sanitize_camera_url(url, allowed_schemes) is not None
- def list_usb_cameras() -> list[dict]:
- """List available USB cameras (V4L2 devices on Linux).
- Returns:
- List of dicts with {device: str, name: str, capabilities: list}
- """
- cameras = []
- video_devices = sorted(Path("/dev").glob("video*"))
- for device in video_devices:
- device_path = str(device)
- info = {"device": device_path, "name": device.name, "capabilities": []}
- # Try to get device info via v4l2-ctl
- v4l2_ctl = shutil.which("v4l2-ctl")
- if v4l2_ctl:
- import subprocess
- try:
- result = subprocess.run(
- [v4l2_ctl, "-d", device_path, "--info"],
- capture_output=True,
- text=True,
- timeout=5,
- )
- if result.returncode == 0:
- # Parse device name from output
- for line in result.stdout.splitlines():
- if "Card type" in line:
- info["name"] = line.split(":", 1)[1].strip()
- elif "Driver name" in line:
- info["driver"] = line.split(":", 1)[1].strip()
- # Check if device supports video capture
- result = subprocess.run(
- [v4l2_ctl, "-d", device_path, "--list-formats"],
- capture_output=True,
- text=True,
- timeout=5,
- )
- if result.returncode == 0 and result.stdout.strip():
- info["capabilities"].append("capture")
- # Parse available formats
- formats = re.findall(r"'(\w+)'", result.stdout)
- info["formats"] = list(set(formats))
- except (subprocess.TimeoutExpired, Exception) as e:
- logger.debug("v4l2-ctl failed for %s: %s", device_path, e)
- # Only include devices that look like video capture devices
- # Skip metadata devices (typically odd numbered like video1, video3)
- try:
- device_num = int(device.name.replace("video", ""))
- # Even numbered devices are usually capture, odd are metadata
- # But also check if we got capabilities
- if info.get("capabilities") or device_num % 2 == 0:
- cameras.append(info)
- except ValueError:
- cameras.append(info)
- return cameras
- def get_ffmpeg_path() -> str | None:
- """Get the path to ffmpeg executable."""
- # Try shutil.which first
- path = shutil.which("ffmpeg")
- if path:
- return path
- # Check common locations (systemd services may have limited PATH)
- for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
- if Path(common_path).exists():
- return common_path
- return None
- async def capture_frame(
- url: str,
- camera_type: str,
- timeout: int = 15,
- snapshot_url: str | None = None,
- ) -> bytes | None:
- """Capture single frame from external camera.
- Args:
- url: Live-stream URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path).
- camera_type: "mjpeg", "rtsp", "snapshot", or "usb".
- timeout: Connection timeout in seconds.
- snapshot_url: Optional override for single-frame capture. When set, fetched
- via plain HTTP GET regardless of `camera_type`. Bypasses MJPEG warm-up
- handling on sources that expose a dedicated frame endpoint (e.g. go2rtc's
- `/api/frame.jpeg` reliably returns a clean image while the MJPEG stream's
- first frame is often the encoder's stale keyframe). #1177.
- Returns:
- JPEG bytes or None on failure
- """
- if snapshot_url:
- logger.debug("capture_frame using snapshot override url=%s...", snapshot_url[:50])
- return await _capture_snapshot(snapshot_url, timeout)
- logger.debug("capture_frame called: type=%s, url=%s...", camera_type, url[:50] if url else "None")
- if camera_type == "mjpeg":
- return await _capture_mjpeg_frame(url, timeout)
- elif camera_type == "rtsp":
- return await _capture_rtsp_frame(url, timeout)
- elif camera_type == "snapshot":
- return await _capture_snapshot(url, timeout)
- elif camera_type == "usb":
- return await _capture_usb_frame(url, timeout)
- else:
- logger.warning("Unknown camera type: %s", camera_type)
- return None
- async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
- """Capture frame from USB camera using ffmpeg."""
- ffmpeg = get_ffmpeg_path()
- if not ffmpeg:
- logger.error("ffmpeg not found - required for USB camera capture")
- return None
- # Validate device path - must be /dev/videoN format where N is 0-99
- # This prevents path traversal by using a strict allowlist approach
- import re as regex_module
- device_match = regex_module.match(r"^/dev/video(\d{1,2})$", device)
- if not device_match:
- logger.error("Invalid USB device path format: %s", device)
- return None
- # Convert to integer to break taint chain - integers cannot contain path traversal
- # lgtm[py/path-injection] - device_num is validated integer 0-99
- device_num = int(device_match.group(1)) # Safe: regex guarantees 1-2 digits
- if device_num > 99:
- logger.error("USB device number out of range: %s", device_num)
- return None
- # Construct safe path from validated integer (completely untainted)
- safe_device_path = Path(f"/dev/video{device_num}") # lgtm[py/path-injection]
- if not safe_device_path.exists():
- logger.error("USB device does not exist: %s", safe_device_path)
- return None
- # Use the safe path for ffmpeg - this is a hardcoded /dev/videoN path
- device = str(safe_device_path) # lgtm[py/path-injection]
- # Use ffmpeg to grab a single frame from USB camera
- cmd = [
- ffmpeg,
- "-f",
- "v4l2",
- "-i",
- device,
- "-frames:v",
- "1",
- "-f",
- "image2pipe",
- "-vcodec",
- "mjpeg",
- "-q:v",
- "2",
- "-",
- ]
- try:
- logger.debug("Running USB capture: %s", " ".join(cmd))
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
- if process.returncode != 0:
- logger.error("ffmpeg USB capture failed: %s", stderr.decode()[:200])
- return None
- if not stdout or len(stdout) < 100:
- logger.error("ffmpeg returned empty or too small frame from USB camera")
- return None
- return stdout
- except TimeoutError:
- logger.warning("USB frame capture timed out after %ss", timeout)
- if process:
- process.kill()
- return None
- except OSError as e:
- logger.error("USB frame capture failed: %s", e)
- return None
- async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
- """Extract a single representative frame from an MJPEG stream.
- Many MJPEG sources — go2rtc most notably (#1177), and several IP cameras —
- emit a "warm-up" frame on the byte that follows connection accept: usually
- the last keyframe held in the encoder, which is often black or stale until
- the encoder catches up to live content. To return a frame that's actually
- representative of the scene we read past the first frame and return the
- second; if the connection closes / times out / hits the buffer cap before
- a second frame ever arrives we fall back to the first so callers still
- get *something* (better than degrading slow / single-frame streams to None,
- which would regress every code path that consumed pre-fix behaviour).
- Note: this function intentionally makes requests to user-configured URLs.
- External camera support requires connecting to user-specified camera
- endpoints. URL is sanitized and dangerous destinations are blocked.
- """
- safe_url = _sanitize_camera_url(url, ("http", "https"))
- if not safe_url:
- logger.error("Invalid MJPEG URL format: %s...", url[:50])
- return None
- jpeg_start = b"\xff\xd8"
- jpeg_end = b"\xff\xd9"
- first_frame: bytes | None = None # warm-up frame; fallback if no second arrives
- buffer = b""
- try:
- async with (
- aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
- session.get(safe_url) as response,
- ):
- if response.status != 200:
- logger.error("MJPEG stream returned status %s", response.status)
- return None
- async for chunk in response.content.iter_chunked(8192):
- buffer += chunk
- # A single chunk can carry multiple frames (e.g. high-FPS sources)
- # or a partial frame. Drain every complete frame we already have
- # before pulling the next chunk.
- while True:
- start_idx = buffer.find(jpeg_start)
- if start_idx == -1:
- # No frame start yet — drop trailing garbage, keep waiting.
- break
- end_idx = buffer.find(jpeg_end, start_idx + 2)
- if end_idx == -1:
- # Partial frame; trim already-discarded prefix so the
- # buffer stays bounded across long-running streams.
- if start_idx > 0:
- buffer = buffer[start_idx:]
- break
- frame = buffer[start_idx : end_idx + 2]
- buffer = buffer[end_idx + 2 :]
- if first_frame is None:
- first_frame = frame # warm-up; keep but don't return yet
- continue
- return frame # representative second frame
- if len(buffer) > 5 * 1024 * 1024: # 5MB limit
- logger.warning("MJPEG buffer exceeded 5MB without finding frame")
- break # exit chunk loop, fall through to first_frame fallback
- except TimeoutError:
- logger.warning("MJPEG frame capture timed out after %ss", timeout)
- except (aiohttp.ClientError, OSError) as e:
- logger.error("MJPEG frame capture failed: %s", e)
- # Stream ended / timed out / buffer cap before a second frame arrived.
- # Return whatever warm-up frame we managed to read; better an iffy frame
- # than None for callers that need *some* image (snapshot UX, plate-detect
- # CV, finish photo). None only if no frame ever arrived at all.
- return first_frame
- async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
- """Capture frame from RTSP using ffmpeg.
- For rtsps:// URLs, a local TLS proxy is used to avoid GnuTLS issues.
- """
- ffmpeg = get_ffmpeg_path()
- if not ffmpeg:
- logger.error("ffmpeg not found - required for RTSP capture")
- return None
- # If rtsps://, use TLS proxy
- proxy_server = None
- effective_url = url
- if url.lower().startswith("rtsps://"):
- try:
- from urllib.parse import urlparse
- from backend.app.services.camera import create_tls_proxy
- parsed = urlparse(url)
- target_port = parsed.port or 322
- proxy_port, proxy_server = await create_tls_proxy(parsed.hostname, target_port)
- userinfo = ""
- if parsed.username:
- userinfo = parsed.username
- if parsed.password:
- userinfo += f":{parsed.password}"
- userinfo += "@"
- effective_url = f"rtsp://{userinfo}127.0.0.1:{proxy_port}{parsed.path}"
- if parsed.query:
- effective_url += f"?{parsed.query}"
- except Exception as e:
- logger.warning("Failed to create TLS proxy for RTSP capture, falling back: %s", e)
- effective_url = url
- cmd = [
- ffmpeg,
- "-rtsp_transport",
- "tcp",
- "-i",
- effective_url,
- "-frames:v",
- "1",
- "-f",
- "image2pipe",
- "-vcodec",
- "mjpeg",
- "-q:v",
- "2",
- "-",
- ]
- try:
- logger.debug("Running ffmpeg RTSP capture...")
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
- logger.debug(
- "ffmpeg returned: code=%s, stdout=%s bytes, stderr=%s bytes",
- process.returncode,
- len(stdout),
- len(stderr),
- )
- if process.returncode != 0:
- logger.error("ffmpeg RTSP capture failed: %s", stderr.decode()[:200])
- return None
- if not stdout or len(stdout) < 100:
- logger.error("ffmpeg returned empty or too small frame")
- return None
- return stdout
- except TimeoutError:
- logger.warning("RTSP frame capture timed out after %ss", timeout)
- if process:
- process.kill()
- return None
- except OSError as e:
- logger.error("RTSP frame capture failed: %s", e)
- return None
- finally:
- if proxy_server:
- proxy_server.close()
- await proxy_server.wait_closed()
- async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
- """Fetch snapshot from HTTP URL.
- Note: This function intentionally makes requests to user-configured URLs.
- External camera support requires connecting to user-specified camera endpoints.
- URL is sanitized and dangerous destinations are blocked.
- """
- # Sanitize URL - returns reconstructed URL from validated components
- safe_url = _sanitize_camera_url(url, ("http", "https"))
- if not safe_url:
- logger.error("Invalid snapshot URL format: %s...", url[:50])
- return None
- try:
- async with (
- aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
- session.get(safe_url) as response,
- ):
- if response.status != 200:
- logger.error("Snapshot URL returned status %s", response.status)
- return None
- data = await response.read()
- # Validate it looks like JPEG
- if not data.startswith(b"\xff\xd8"):
- logger.warning("Snapshot does not appear to be JPEG")
- # Still return it - might be valid with different header
- return data
- except TimeoutError:
- logger.warning("Snapshot capture timed out after %ss", timeout)
- return None
- except (aiohttp.ClientError, OSError) as e:
- logger.error("Snapshot capture failed: %s", e)
- return None
- async def test_connection(url: str, camera_type: str) -> dict:
- """Test camera connection.
- Returns:
- Dict with {success: bool, error?: str, resolution?: str}
- """
- logger.info("Testing camera connection: type=%s, url=%s...", camera_type, url[:50])
- try:
- frame = await capture_frame(url, camera_type, timeout=10)
- logger.info("Capture result: %s bytes", len(frame) if frame else 0)
- if frame:
- # Try to get resolution from JPEG header
- resolution = None
- try:
- # Simple JPEG dimension extraction
- # SOF0 marker is FF C0, followed by length, precision, height, width
- sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
- for marker in sof_markers:
- idx = frame.find(marker)
- if idx != -1 and idx + 9 <= len(frame):
- height = (frame[idx + 5] << 8) | frame[idx + 6]
- width = (frame[idx + 7] << 8) | frame[idx + 8]
- resolution = f"{width}x{height}"
- break
- except (IndexError, ValueError):
- pass # Resolution detection is optional; fall back to default
- return {"success": True, "resolution": resolution}
- else:
- return {"success": False, "error": "Failed to capture frame from camera"}
- except Exception as e:
- # Sanitize error message - don't expose internal details
- error_type = type(e).__name__
- logger.error("Camera connection test failed: %s", e)
- return {"success": False, "error": f"Connection failed: {error_type}"}
- async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
- """Generator yielding MJPEG frames for streaming.
- Args:
- url: Camera URL or USB device path
- camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
- fps: Target frames per second
- Yields:
- MJPEG frame data with HTTP multipart boundaries
- """
- frame_interval = 1.0 / max(fps, 1)
- last_frame_time = 0.0
- if camera_type == "mjpeg":
- # Proxy MJPEG stream directly, with reconnect on timeout
- max_retries = 3
- for attempt in range(max_retries + 1):
- frame_yielded = False
- async for frame in _stream_mjpeg(url):
- frame_yielded = True
- current_time = asyncio.get_event_loop().time()
- if current_time - last_frame_time >= frame_interval:
- last_frame_time = current_time
- yield _format_mjpeg_frame(frame)
- if not frame_yielded or attempt == max_retries:
- break
- logger.warning(
- "External MJPEG stream ended, reconnecting (attempt %d/%d)...",
- attempt + 1,
- max_retries,
- )
- await asyncio.sleep(2)
- elif camera_type == "rtsp":
- # Use ffmpeg to convert RTSP to MJPEG, with reconnect on timeout
- max_retries = 3
- for attempt in range(max_retries + 1):
- frame_yielded = False
- async for frame in _stream_rtsp(url, fps):
- frame_yielded = True
- yield _format_mjpeg_frame(frame)
- if not frame_yielded or attempt == max_retries:
- break
- logger.warning(
- "External RTSP stream ended, reconnecting (attempt %d/%d)...",
- attempt + 1,
- max_retries,
- )
- await asyncio.sleep(2)
- elif camera_type == "usb":
- # Use ffmpeg to stream from USB camera
- async for frame in _stream_usb(url, fps):
- yield _format_mjpeg_frame(frame)
- elif camera_type == "snapshot":
- # Poll snapshot URL at interval
- while True:
- try:
- frame = await _capture_snapshot(url, timeout=10)
- if frame:
- yield _format_mjpeg_frame(frame)
- await asyncio.sleep(frame_interval)
- except asyncio.CancelledError:
- break
- except (aiohttp.ClientError, OSError) as e:
- logger.warning("Snapshot poll failed: %s", e)
- await asyncio.sleep(frame_interval)
- def _format_mjpeg_frame(frame: bytes) -> bytes:
- """Format frame for MJPEG HTTP response."""
- return (
- b"--frame\r\n"
- b"Content-Type: image/jpeg\r\n"
- b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
- b"\r\n" + frame + b"\r\n"
- )
- async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
- """Stream frames from MJPEG URL.
- Note: This function intentionally makes requests to user-configured URLs.
- External camera support requires connecting to user-specified camera endpoints.
- URL is sanitized and dangerous destinations are blocked.
- """
- # Sanitize URL - returns reconstructed URL from validated components
- safe_url = _sanitize_camera_url(url, ("http", "https"))
- if not safe_url:
- logger.error("Invalid MJPEG stream URL: %s...", url[:50])
- return
- try:
- timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
- async with aiohttp.ClientSession(timeout=timeout) as session, session.get(safe_url) as response:
- if response.status != 200:
- logger.error("MJPEG stream returned status %s", response.status)
- return
- buffer = b""
- jpeg_start = b"\xff\xd8"
- jpeg_end = b"\xff\xd9"
- async for chunk in response.content.iter_chunked(8192):
- buffer += chunk
- # Extract complete frames from buffer
- while True:
- start_idx = buffer.find(jpeg_start)
- if start_idx == -1:
- buffer = buffer[-2:] if len(buffer) > 2 else buffer
- break
- if start_idx > 0:
- buffer = buffer[start_idx:]
- end_idx = buffer.find(jpeg_end, 2)
- if end_idx == -1:
- break
- frame = buffer[: end_idx + 2]
- buffer = buffer[end_idx + 2 :]
- yield frame
- except asyncio.CancelledError:
- logger.info("MJPEG stream cancelled")
- except (aiohttp.ClientError, OSError) as e:
- logger.error("MJPEG stream error: %s", e)
- async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
- """Stream frames from RTSP URL via ffmpeg.
- For rtsps:// URLs, a local TLS proxy (Python OpenSSL) is used instead
- of relying on ffmpeg's GnuTLS backend, which has compatibility issues
- with some printer firmwares.
- """
- ffmpeg = get_ffmpeg_path()
- if not ffmpeg:
- logger.error("ffmpeg not found - required for RTSP streaming")
- return
- from backend.app.services.camera import rtsp_socket_timeout_flag
- # If the URL uses rtsps://, set up a TLS proxy so ffmpeg uses plain rtsp://
- proxy_server = None
- effective_url = url
- if url.lower().startswith("rtsps://"):
- try:
- from urllib.parse import urlparse
- from backend.app.services.camera import create_tls_proxy
- parsed = urlparse(url)
- target_port = parsed.port or 322
- proxy_port, proxy_server = await create_tls_proxy(parsed.hostname, target_port)
- # Rewrite URL: rtsps://user:pass@host:port/path → rtsp://user:pass@127.0.0.1:proxy/path
- userinfo = ""
- if parsed.username:
- userinfo = parsed.username
- if parsed.password:
- userinfo += f":{parsed.password}"
- userinfo += "@"
- effective_url = f"rtsp://{userinfo}127.0.0.1:{proxy_port}{parsed.path}"
- if parsed.query:
- effective_url += f"?{parsed.query}"
- except Exception as e:
- logger.warning("Failed to create TLS proxy for RTSP, falling back to direct: %s", e)
- effective_url = url
- cmd = [
- ffmpeg,
- "-rtsp_transport",
- "tcp",
- "-rtsp_flags",
- "prefer_tcp",
- # Socket I/O timeout name varies by ffmpeg version (#1504); see
- # `rtsp_socket_timeout_flag()` in services.camera.
- f"-{rtsp_socket_timeout_flag()}",
- "30000000",
- "-buffer_size",
- "1024000",
- "-max_delay",
- "500000",
- "-probesize",
- "32",
- "-analyzeduration",
- "0",
- "-fflags",
- "nobuffer",
- "-flags",
- "low_delay",
- "-i",
- effective_url,
- "-f",
- "mjpeg",
- "-q:v",
- "5",
- "-r",
- str(fps),
- "-an",
- "-",
- ]
- process = None
- try:
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- # Brief check for immediate startup failures
- await asyncio.sleep(0.1)
- if process.returncode is not None:
- stderr = await process.stderr.read()
- logger.error("ffmpeg RTSP stream failed immediately: %s", stderr.decode()[:300])
- return
- buffer = b""
- jpeg_start = b"\xff\xd8"
- jpeg_end = b"\xff\xd9"
- while True:
- try:
- chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
- if not chunk:
- break
- buffer += chunk
- # Extract complete frames
- while True:
- start_idx = buffer.find(jpeg_start)
- if start_idx == -1:
- buffer = buffer[-2:] if len(buffer) > 2 else buffer
- break
- if start_idx > 0:
- buffer = buffer[start_idx:]
- end_idx = buffer.find(jpeg_end, 2)
- if end_idx == -1:
- break
- frame = buffer[: end_idx + 2]
- buffer = buffer[end_idx + 2 :]
- yield frame
- except TimeoutError:
- logger.warning("RTSP stream read timeout")
- break
- except asyncio.CancelledError:
- logger.info("RTSP stream cancelled")
- except OSError as e:
- logger.error("RTSP stream error: %s", e)
- finally:
- if process and process.returncode is None:
- process.terminate()
- try:
- await asyncio.wait_for(process.wait(), timeout=2.0)
- except TimeoutError:
- process.kill()
- await process.wait()
- if proxy_server:
- proxy_server.close()
- await proxy_server.wait_closed()
- async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
- """Stream frames from USB camera via ffmpeg."""
- ffmpeg = get_ffmpeg_path()
- if not ffmpeg:
- logger.error("ffmpeg not found - required for USB camera streaming")
- return
- # Validate device path
- if not device.startswith("/dev/video"):
- logger.error("Invalid USB device path: %s", device)
- return
- if not Path(device).exists():
- logger.error("USB device does not exist: %s", device)
- return
- # ffmpeg command to stream from USB camera (v4l2)
- cmd = [
- ffmpeg,
- "-f",
- "v4l2",
- "-framerate",
- str(fps),
- "-i",
- device,
- "-f",
- "mjpeg",
- "-q:v",
- "5",
- "-r",
- str(fps),
- "-",
- ]
- process = None
- try:
- logger.info("Starting USB camera stream from %s at %s fps", device, fps)
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- # Give ffmpeg a moment to start and check for immediate failures
- await asyncio.sleep(0.5)
- if process.returncode is not None:
- stderr = await process.stderr.read()
- logger.error("ffmpeg USB stream failed immediately: %s", stderr.decode()[:300])
- return
- buffer = b""
- jpeg_start = b"\xff\xd8"
- jpeg_end = b"\xff\xd9"
- while True:
- try:
- chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
- if not chunk:
- break
- buffer += chunk
- # Extract complete frames
- while True:
- start_idx = buffer.find(jpeg_start)
- if start_idx == -1:
- buffer = buffer[-2:] if len(buffer) > 2 else buffer
- break
- if start_idx > 0:
- buffer = buffer[start_idx:]
- end_idx = buffer.find(jpeg_end, 2)
- if end_idx == -1:
- break
- frame = buffer[: end_idx + 2]
- buffer = buffer[end_idx + 2 :]
- yield frame
- except TimeoutError:
- logger.warning("USB stream read timeout")
- break
- except asyncio.CancelledError:
- logger.info("USB stream cancelled")
- except OSError as e:
- logger.error("USB stream error: %s", e)
- finally:
- if process and process.returncode is None:
- process.terminate()
- try:
- await asyncio.wait_for(process.wait(), timeout=2.0)
- except TimeoutError:
- process.kill()
- await process.wait()
|