|
|
@@ -0,0 +1,409 @@
|
|
|
+"""External network camera service.
|
|
|
+
|
|
|
+Supports MJPEG streams, RTSP streams (via ffmpeg), and HTTP snapshot URLs.
|
|
|
+"""
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import logging
|
|
|
+import shutil
|
|
|
+from collections.abc import AsyncGenerator
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+import aiohttp
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+
|
|
|
+def get_ffmpeg_path() -> str | None:
|
|
|
+ """Get the path to ffmpeg executable."""
|
|
|
+ # Try shutil.which first
|
|
|
+ path = shutil.which("ffmpeg")
|
|
|
+ if path:
|
|
|
+ return path
|
|
|
+ # Check common locations (systemd services may have limited PATH)
|
|
|
+ for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
|
|
|
+ if Path(common_path).exists():
|
|
|
+ return common_path
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
|
|
|
+ """Capture single frame from external camera.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ url: Camera URL (MJPEG stream, RTSP URL, or HTTP snapshot URL)
|
|
|
+ camera_type: "mjpeg", "rtsp", or "snapshot"
|
|
|
+ timeout: Connection timeout in seconds
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ JPEG bytes or None on failure
|
|
|
+ """
|
|
|
+ logger.debug(f"capture_frame called: type={camera_type}, url={url[:50] if url else 'None'}...")
|
|
|
+ if camera_type == "mjpeg":
|
|
|
+ return await _capture_mjpeg_frame(url, timeout)
|
|
|
+ elif camera_type == "rtsp":
|
|
|
+ return await _capture_rtsp_frame(url, timeout)
|
|
|
+ elif camera_type == "snapshot":
|
|
|
+ return await _capture_snapshot(url, timeout)
|
|
|
+ else:
|
|
|
+ logger.warning(f"Unknown camera type: {camera_type}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
|
|
|
+ """Extract single frame from MJPEG stream."""
|
|
|
+ try:
|
|
|
+ async with (
|
|
|
+ aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
|
|
|
+ session.get(url) as response,
|
|
|
+ ):
|
|
|
+ if response.status != 200:
|
|
|
+ logger.error(f"MJPEG stream returned status {response.status}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # Read chunks until we find a complete JPEG frame
|
|
|
+ buffer = b""
|
|
|
+ jpeg_start = b"\xff\xd8"
|
|
|
+ jpeg_end = b"\xff\xd9"
|
|
|
+
|
|
|
+ async for chunk in response.content.iter_chunked(8192):
|
|
|
+ buffer += chunk
|
|
|
+
|
|
|
+ # Look for complete JPEG frame
|
|
|
+ start_idx = buffer.find(jpeg_start)
|
|
|
+ if start_idx == -1:
|
|
|
+ continue
|
|
|
+
|
|
|
+ end_idx = buffer.find(jpeg_end, start_idx + 2)
|
|
|
+ if end_idx != -1:
|
|
|
+ # Found complete frame
|
|
|
+ frame = buffer[start_idx : end_idx + 2]
|
|
|
+ return frame
|
|
|
+
|
|
|
+ # Keep searching, but limit buffer size
|
|
|
+ if len(buffer) > 5 * 1024 * 1024: # 5MB limit
|
|
|
+ logger.warning("MJPEG buffer exceeded 5MB without finding frame")
|
|
|
+ return None
|
|
|
+
|
|
|
+ except TimeoutError:
|
|
|
+ logger.warning(f"MJPEG frame capture timed out after {timeout}s")
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"MJPEG frame capture failed: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
|
|
|
+ """Capture frame from RTSP using ffmpeg."""
|
|
|
+ ffmpeg = get_ffmpeg_path()
|
|
|
+ if not ffmpeg:
|
|
|
+ logger.error("ffmpeg not found - required for RTSP capture")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # Use ffmpeg to grab a single frame from RTSP stream
|
|
|
+ # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
|
|
|
+ cmd = [
|
|
|
+ ffmpeg,
|
|
|
+ "-rtsp_transport",
|
|
|
+ "tcp",
|
|
|
+ "-i",
|
|
|
+ url,
|
|
|
+ "-frames:v",
|
|
|
+ "1",
|
|
|
+ "-f",
|
|
|
+ "image2pipe",
|
|
|
+ "-vcodec",
|
|
|
+ "mjpeg",
|
|
|
+ "-q:v",
|
|
|
+ "2",
|
|
|
+ "-",
|
|
|
+ ]
|
|
|
+
|
|
|
+ try:
|
|
|
+ print(f"[EXT-CAM] Running ffmpeg command: {' '.join(cmd[:6])}...")
|
|
|
+ process = await asyncio.create_subprocess_exec(
|
|
|
+ *cmd,
|
|
|
+ stdout=asyncio.subprocess.PIPE,
|
|
|
+ stderr=asyncio.subprocess.PIPE,
|
|
|
+ )
|
|
|
+
|
|
|
+ stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
|
|
|
+ print(
|
|
|
+ f"[EXT-CAM] ffmpeg returned: code={process.returncode}, stdout={len(stdout)} bytes, stderr={len(stderr)} bytes"
|
|
|
+ )
|
|
|
+
|
|
|
+ if process.returncode != 0:
|
|
|
+ logger.error(f"ffmpeg RTSP capture failed: {stderr.decode()[:200]}")
|
|
|
+ print(f"[EXT-CAM] ffmpeg error: {stderr.decode()[:300]}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ if not stdout or len(stdout) < 100:
|
|
|
+ logger.error("ffmpeg returned empty or too small frame")
|
|
|
+ return None
|
|
|
+
|
|
|
+ return stdout
|
|
|
+
|
|
|
+ except TimeoutError:
|
|
|
+ logger.warning(f"RTSP frame capture timed out after {timeout}s")
|
|
|
+ if process:
|
|
|
+ process.kill()
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"RTSP frame capture failed: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
|
|
|
+ """Fetch snapshot from HTTP URL."""
|
|
|
+ try:
|
|
|
+ async with (
|
|
|
+ aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
|
|
|
+ session.get(url) as response,
|
|
|
+ ):
|
|
|
+ if response.status != 200:
|
|
|
+ logger.error(f"Snapshot URL returned status {response.status}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ data = await response.read()
|
|
|
+
|
|
|
+ # Validate it looks like JPEG
|
|
|
+ if not data.startswith(b"\xff\xd8"):
|
|
|
+ logger.warning("Snapshot does not appear to be JPEG")
|
|
|
+ # Still return it - might be valid with different header
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+ except TimeoutError:
|
|
|
+ logger.warning(f"Snapshot capture timed out after {timeout}s")
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Snapshot capture failed: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+async def test_connection(url: str, camera_type: str) -> dict:
|
|
|
+ """Test camera connection.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Dict with {success: bool, error?: str, resolution?: str}
|
|
|
+ """
|
|
|
+ print(f"[EXT-CAM] Testing camera connection: type={camera_type}, url={url[:50]}...")
|
|
|
+ logger.info(f"Testing camera connection: type={camera_type}, url={url[:50]}...")
|
|
|
+ try:
|
|
|
+ frame = await capture_frame(url, camera_type, timeout=10)
|
|
|
+ print(f"[EXT-CAM] Capture result: {len(frame) if frame else 0} bytes")
|
|
|
+ logger.info(f"Capture result: {len(frame) if frame else 0} bytes")
|
|
|
+
|
|
|
+ if frame:
|
|
|
+ # Try to get resolution from JPEG header
|
|
|
+ resolution = None
|
|
|
+ try:
|
|
|
+ # Simple JPEG dimension extraction
|
|
|
+ # SOF0 marker is FF C0, followed by length, precision, height, width
|
|
|
+ sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
|
|
|
+ for marker in sof_markers:
|
|
|
+ idx = frame.find(marker)
|
|
|
+ if idx != -1 and idx + 9 <= len(frame):
|
|
|
+ height = (frame[idx + 5] << 8) | frame[idx + 6]
|
|
|
+ width = (frame[idx + 7] << 8) | frame[idx + 8]
|
|
|
+ resolution = f"{width}x{height}"
|
|
|
+ break
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ return {"success": True, "resolution": resolution}
|
|
|
+ else:
|
|
|
+ return {"success": False, "error": "Failed to capture frame from camera"}
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return {"success": False, "error": str(e)}
|
|
|
+
|
|
|
+
|
|
|
+async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
|
|
|
+ """Generator yielding MJPEG frames for streaming.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ url: Camera URL
|
|
|
+ camera_type: "mjpeg", "rtsp", or "snapshot"
|
|
|
+ fps: Target frames per second
|
|
|
+
|
|
|
+ Yields:
|
|
|
+ MJPEG frame data with HTTP multipart boundaries
|
|
|
+ """
|
|
|
+ frame_interval = 1.0 / max(fps, 1)
|
|
|
+ last_frame_time = 0.0
|
|
|
+
|
|
|
+ if camera_type == "mjpeg":
|
|
|
+ # Proxy MJPEG stream directly
|
|
|
+ async for frame in _stream_mjpeg(url):
|
|
|
+ current_time = asyncio.get_event_loop().time()
|
|
|
+ if current_time - last_frame_time >= frame_interval:
|
|
|
+ last_frame_time = current_time
|
|
|
+ yield _format_mjpeg_frame(frame)
|
|
|
+
|
|
|
+ elif camera_type == "rtsp":
|
|
|
+ # Use ffmpeg to convert RTSP to MJPEG
|
|
|
+ async for frame in _stream_rtsp(url, fps):
|
|
|
+ yield _format_mjpeg_frame(frame)
|
|
|
+
|
|
|
+ elif camera_type == "snapshot":
|
|
|
+ # Poll snapshot URL at interval
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ frame = await _capture_snapshot(url, timeout=10)
|
|
|
+ if frame:
|
|
|
+ yield _format_mjpeg_frame(frame)
|
|
|
+ await asyncio.sleep(frame_interval)
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ break
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Snapshot poll failed: {e}")
|
|
|
+ await asyncio.sleep(frame_interval)
|
|
|
+
|
|
|
+
|
|
|
+def _format_mjpeg_frame(frame: bytes) -> bytes:
|
|
|
+ """Format frame for MJPEG HTTP response."""
|
|
|
+ return (
|
|
|
+ b"--frame\r\n"
|
|
|
+ b"Content-Type: image/jpeg\r\n"
|
|
|
+ b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
|
|
|
+ b"\r\n" + frame + b"\r\n"
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
|
|
|
+ """Stream frames from MJPEG URL."""
|
|
|
+ try:
|
|
|
+ timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
|
|
|
+ async with aiohttp.ClientSession(timeout=timeout) as session, session.get(url) as response:
|
|
|
+ if response.status != 200:
|
|
|
+ logger.error(f"MJPEG stream returned status {response.status}")
|
|
|
+ return
|
|
|
+
|
|
|
+ buffer = b""
|
|
|
+ jpeg_start = b"\xff\xd8"
|
|
|
+ jpeg_end = b"\xff\xd9"
|
|
|
+
|
|
|
+ async for chunk in response.content.iter_chunked(8192):
|
|
|
+ buffer += chunk
|
|
|
+
|
|
|
+ # Extract complete frames from buffer
|
|
|
+ while True:
|
|
|
+ start_idx = buffer.find(jpeg_start)
|
|
|
+ if start_idx == -1:
|
|
|
+ buffer = buffer[-2:] if len(buffer) > 2 else buffer
|
|
|
+ break
|
|
|
+
|
|
|
+ if start_idx > 0:
|
|
|
+ buffer = buffer[start_idx:]
|
|
|
+
|
|
|
+ end_idx = buffer.find(jpeg_end, 2)
|
|
|
+ if end_idx == -1:
|
|
|
+ break
|
|
|
+
|
|
|
+ frame = buffer[: end_idx + 2]
|
|
|
+ buffer = buffer[end_idx + 2 :]
|
|
|
+ yield frame
|
|
|
+
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ logger.info("MJPEG stream cancelled")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"MJPEG stream error: {e}")
|
|
|
+
|
|
|
+
|
|
|
+async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
|
|
|
+ """Stream frames from RTSP URL via ffmpeg."""
|
|
|
+ ffmpeg = get_ffmpeg_path()
|
|
|
+ if not ffmpeg:
|
|
|
+ logger.error("ffmpeg not found - required for RTSP streaming")
|
|
|
+ return
|
|
|
+
|
|
|
+ # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
|
|
|
+ cmd = [
|
|
|
+ ffmpeg,
|
|
|
+ "-rtsp_transport",
|
|
|
+ "tcp",
|
|
|
+ "-rtsp_flags",
|
|
|
+ "prefer_tcp",
|
|
|
+ "-timeout",
|
|
|
+ "30000000",
|
|
|
+ "-buffer_size",
|
|
|
+ "1024000",
|
|
|
+ "-max_delay",
|
|
|
+ "500000",
|
|
|
+ "-i",
|
|
|
+ url,
|
|
|
+ "-f",
|
|
|
+ "mjpeg",
|
|
|
+ "-q:v",
|
|
|
+ "5",
|
|
|
+ "-r",
|
|
|
+ str(fps),
|
|
|
+ "-an",
|
|
|
+ "-",
|
|
|
+ ]
|
|
|
+
|
|
|
+ process = None
|
|
|
+ try:
|
|
|
+ process = await asyncio.create_subprocess_exec(
|
|
|
+ *cmd,
|
|
|
+ stdout=asyncio.subprocess.PIPE,
|
|
|
+ stderr=asyncio.subprocess.PIPE,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Give ffmpeg a moment to start and check for immediate failures
|
|
|
+ await asyncio.sleep(0.5)
|
|
|
+ if process.returncode is not None:
|
|
|
+ stderr = await process.stderr.read()
|
|
|
+ logger.error(f"ffmpeg RTSP stream failed immediately: {stderr.decode()[:300]}")
|
|
|
+ return
|
|
|
+
|
|
|
+ buffer = b""
|
|
|
+ jpeg_start = b"\xff\xd8"
|
|
|
+ jpeg_end = b"\xff\xd9"
|
|
|
+
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
|
|
|
+
|
|
|
+ if not chunk:
|
|
|
+ break
|
|
|
+
|
|
|
+ buffer += chunk
|
|
|
+
|
|
|
+ # Extract complete frames
|
|
|
+ while True:
|
|
|
+ start_idx = buffer.find(jpeg_start)
|
|
|
+ if start_idx == -1:
|
|
|
+ buffer = buffer[-2:] if len(buffer) > 2 else buffer
|
|
|
+ break
|
|
|
+
|
|
|
+ if start_idx > 0:
|
|
|
+ buffer = buffer[start_idx:]
|
|
|
+
|
|
|
+ end_idx = buffer.find(jpeg_end, 2)
|
|
|
+ if end_idx == -1:
|
|
|
+ break
|
|
|
+
|
|
|
+ frame = buffer[: end_idx + 2]
|
|
|
+ buffer = buffer[end_idx + 2 :]
|
|
|
+ yield frame
|
|
|
+
|
|
|
+ except TimeoutError:
|
|
|
+ logger.warning("RTSP stream read timeout")
|
|
|
+ break
|
|
|
+
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ logger.info("RTSP stream cancelled")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"RTSP stream error: {e}")
|
|
|
+ finally:
|
|
|
+ if process and process.returncode is None:
|
|
|
+ process.terminate()
|
|
|
+ try:
|
|
|
+ await asyncio.wait_for(process.wait(), timeout=2.0)
|
|
|
+ except TimeoutError:
|
|
|
+ process.kill()
|
|
|
+ await process.wait()
|