external_camera.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. """External camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
  3. Security Note: This service intentionally makes requests to user-configured camera URLs.
  4. This is necessary functionality for external camera integration. URLs are validated
  5. to ensure they are well-formed before use.
  6. """
  7. import asyncio
  8. import logging
  9. import re
  10. import shutil
  11. from collections.abc import AsyncGenerator
  12. from pathlib import Path
  13. from urllib.parse import urlparse
  14. import aiohttp
  15. logger = logging.getLogger(__name__)
  16. def _sanitize_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> str | None:
  17. """Validate and sanitize camera URL, returning a safe reconstructed URL.
  18. This validates that the URL is well-formed, uses an allowed scheme,
  19. does not target cloud metadata services, and returns a reconstructed
  20. URL from validated components.
  21. Note: This intentionally allows user-provided URLs as that is the
  22. purpose of external camera configuration. Local network IPs are
  23. allowed since cameras are typically on the same LAN.
  24. Args:
  25. url: URL to validate and sanitize
  26. allowed_schemes: Tuple of allowed URL schemes
  27. Returns:
  28. Sanitized URL string if valid, None otherwise
  29. """
  30. try:
  31. parsed = urlparse(url)
  32. if not parsed.scheme or not parsed.netloc:
  33. return None
  34. # Validate scheme against allowlist
  35. scheme = parsed.scheme.lower()
  36. if scheme not in allowed_schemes:
  37. return None
  38. # Block cloud metadata service endpoints (SSRF mitigation)
  39. # These are dangerous destinations that should never be accessed
  40. hostname = parsed.hostname or ""
  41. hostname_lower = hostname.lower()
  42. blocked_hosts = (
  43. "169.254.169.254", # AWS/GCP/Azure metadata
  44. "metadata.google.internal", # GCP metadata
  45. "metadata.google",
  46. "localhost", # Block localhost to prevent internal service access
  47. "127.0.0.1",
  48. "::1",
  49. "0.0.0.0", # nosec B104
  50. )
  51. if hostname_lower in blocked_hosts:
  52. logger.warning("Blocked camera URL targeting restricted host: %s", hostname)
  53. return None
  54. # Block link-local addresses (169.254.x.x)
  55. if hostname.startswith("169.254."):
  56. logger.warning("Blocked camera URL targeting link-local address: %s", hostname)
  57. return None
  58. # Reconstruct URL from validated components to break taint chain
  59. # This creates a new string from validated parts
  60. port_str = f":{parsed.port}" if parsed.port else ""
  61. path = parsed.path or ""
  62. query = f"?{parsed.query}" if parsed.query else ""
  63. fragment = f"#{parsed.fragment}" if parsed.fragment else ""
  64. # Build sanitized URL from validated components
  65. sanitized = f"{scheme}://{hostname}{port_str}{path}{query}{fragment}"
  66. return sanitized
  67. except ValueError:
  68. return None
  69. def _validate_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> bool:
  70. """Validate camera URL format (legacy wrapper).
  71. Args:
  72. url: URL to validate
  73. allowed_schemes: Tuple of allowed URL schemes
  74. Returns:
  75. True if URL is valid, False otherwise
  76. """
  77. return _sanitize_camera_url(url, allowed_schemes) is not None
  78. def list_usb_cameras() -> list[dict]:
  79. """List available USB cameras (V4L2 devices on Linux).
  80. Returns:
  81. List of dicts with {device: str, name: str, capabilities: list}
  82. """
  83. cameras = []
  84. video_devices = sorted(Path("/dev").glob("video*"))
  85. for device in video_devices:
  86. device_path = str(device)
  87. info = {"device": device_path, "name": device.name, "capabilities": []}
  88. # Try to get device info via v4l2-ctl
  89. v4l2_ctl = shutil.which("v4l2-ctl")
  90. if v4l2_ctl:
  91. import subprocess
  92. try:
  93. result = subprocess.run(
  94. [v4l2_ctl, "-d", device_path, "--info"],
  95. capture_output=True,
  96. text=True,
  97. timeout=5,
  98. )
  99. if result.returncode == 0:
  100. # Parse device name from output
  101. for line in result.stdout.splitlines():
  102. if "Card type" in line:
  103. info["name"] = line.split(":", 1)[1].strip()
  104. elif "Driver name" in line:
  105. info["driver"] = line.split(":", 1)[1].strip()
  106. # Check if device supports video capture
  107. result = subprocess.run(
  108. [v4l2_ctl, "-d", device_path, "--list-formats"],
  109. capture_output=True,
  110. text=True,
  111. timeout=5,
  112. )
  113. if result.returncode == 0 and result.stdout.strip():
  114. info["capabilities"].append("capture")
  115. # Parse available formats
  116. formats = re.findall(r"'(\w+)'", result.stdout)
  117. info["formats"] = list(set(formats))
  118. except (subprocess.TimeoutExpired, Exception) as e:
  119. logger.debug("v4l2-ctl failed for %s: %s", device_path, e)
  120. # Only include devices that look like video capture devices
  121. # Skip metadata devices (typically odd numbered like video1, video3)
  122. try:
  123. device_num = int(device.name.replace("video", ""))
  124. # Even numbered devices are usually capture, odd are metadata
  125. # But also check if we got capabilities
  126. if info.get("capabilities") or device_num % 2 == 0:
  127. cameras.append(info)
  128. except ValueError:
  129. cameras.append(info)
  130. return cameras
  131. def get_ffmpeg_path() -> str | None:
  132. """Get the path to ffmpeg executable."""
  133. # Try shutil.which first
  134. path = shutil.which("ffmpeg")
  135. if path:
  136. return path
  137. # Check common locations (systemd services may have limited PATH)
  138. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  139. if Path(common_path).exists():
  140. return common_path
  141. return None
  142. async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
  143. """Capture single frame from external camera.
  144. Args:
  145. url: Camera URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path)
  146. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  147. timeout: Connection timeout in seconds
  148. Returns:
  149. JPEG bytes or None on failure
  150. """
  151. logger.debug("capture_frame called: type=%s, url=%s...", camera_type, url[:50] if url else "None")
  152. if camera_type == "mjpeg":
  153. return await _capture_mjpeg_frame(url, timeout)
  154. elif camera_type == "rtsp":
  155. return await _capture_rtsp_frame(url, timeout)
  156. elif camera_type == "snapshot":
  157. return await _capture_snapshot(url, timeout)
  158. elif camera_type == "usb":
  159. return await _capture_usb_frame(url, timeout)
  160. else:
  161. logger.warning("Unknown camera type: %s", camera_type)
  162. return None
  163. async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
  164. """Capture frame from USB camera using ffmpeg."""
  165. ffmpeg = get_ffmpeg_path()
  166. if not ffmpeg:
  167. logger.error("ffmpeg not found - required for USB camera capture")
  168. return None
  169. # Validate device path - must be /dev/videoN format where N is 0-99
  170. # This prevents path traversal by using a strict allowlist approach
  171. import re as regex_module
  172. device_match = regex_module.match(r"^/dev/video(\d{1,2})$", device)
  173. if not device_match:
  174. logger.error("Invalid USB device path format: %s", device)
  175. return None
  176. # Convert to integer to break taint chain - integers cannot contain path traversal
  177. # lgtm[py/path-injection] - device_num is validated integer 0-99
  178. device_num = int(device_match.group(1)) # Safe: regex guarantees 1-2 digits
  179. if device_num > 99:
  180. logger.error("USB device number out of range: %s", device_num)
  181. return None
  182. # Construct safe path from validated integer (completely untainted)
  183. safe_device_path = Path(f"/dev/video{device_num}") # lgtm[py/path-injection]
  184. if not safe_device_path.exists():
  185. logger.error("USB device does not exist: %s", safe_device_path)
  186. return None
  187. # Use the safe path for ffmpeg - this is a hardcoded /dev/videoN path
  188. device = str(safe_device_path) # lgtm[py/path-injection]
  189. # Use ffmpeg to grab a single frame from USB camera
  190. cmd = [
  191. ffmpeg,
  192. "-f",
  193. "v4l2",
  194. "-i",
  195. device,
  196. "-frames:v",
  197. "1",
  198. "-f",
  199. "image2pipe",
  200. "-vcodec",
  201. "mjpeg",
  202. "-q:v",
  203. "2",
  204. "-",
  205. ]
  206. try:
  207. logger.debug("Running USB capture: %s", " ".join(cmd))
  208. process = await asyncio.create_subprocess_exec(
  209. *cmd,
  210. stdout=asyncio.subprocess.PIPE,
  211. stderr=asyncio.subprocess.PIPE,
  212. )
  213. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  214. if process.returncode != 0:
  215. logger.error("ffmpeg USB capture failed: %s", stderr.decode()[:200])
  216. return None
  217. if not stdout or len(stdout) < 100:
  218. logger.error("ffmpeg returned empty or too small frame from USB camera")
  219. return None
  220. return stdout
  221. except TimeoutError:
  222. logger.warning("USB frame capture timed out after %ss", timeout)
  223. if process:
  224. process.kill()
  225. return None
  226. except OSError as e:
  227. logger.error("USB frame capture failed: %s", e)
  228. return None
  229. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  230. """Extract single frame from MJPEG stream.
  231. Note: This function intentionally makes requests to user-configured URLs.
  232. External camera support requires connecting to user-specified camera endpoints.
  233. URL is sanitized and dangerous destinations are blocked.
  234. """
  235. # Sanitize URL - returns reconstructed URL from validated components
  236. safe_url = _sanitize_camera_url(url, ("http", "https"))
  237. if not safe_url:
  238. logger.error("Invalid MJPEG URL format: %s...", url[:50])
  239. return None
  240. try:
  241. async with (
  242. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  243. session.get(safe_url) as response,
  244. ):
  245. if response.status != 200:
  246. logger.error("MJPEG stream returned status %s", response.status)
  247. return None
  248. # Read chunks until we find a complete JPEG frame
  249. buffer = b""
  250. jpeg_start = b"\xff\xd8"
  251. jpeg_end = b"\xff\xd9"
  252. async for chunk in response.content.iter_chunked(8192):
  253. buffer += chunk
  254. # Look for complete JPEG frame
  255. start_idx = buffer.find(jpeg_start)
  256. if start_idx == -1:
  257. continue
  258. end_idx = buffer.find(jpeg_end, start_idx + 2)
  259. if end_idx != -1:
  260. # Found complete frame
  261. frame = buffer[start_idx : end_idx + 2]
  262. return frame
  263. # Keep searching, but limit buffer size
  264. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  265. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  266. return None
  267. except TimeoutError:
  268. logger.warning("MJPEG frame capture timed out after %ss", timeout)
  269. return None
  270. except (aiohttp.ClientError, OSError) as e:
  271. logger.error("MJPEG frame capture failed: %s", e)
  272. return None
  273. return None
  274. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  275. """Capture frame from RTSP using ffmpeg."""
  276. ffmpeg = get_ffmpeg_path()
  277. if not ffmpeg:
  278. logger.error("ffmpeg not found - required for RTSP capture")
  279. return None
  280. # Use ffmpeg to grab a single frame from RTSP stream
  281. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  282. cmd = [
  283. ffmpeg,
  284. "-rtsp_transport",
  285. "tcp",
  286. "-i",
  287. url,
  288. "-frames:v",
  289. "1",
  290. "-f",
  291. "image2pipe",
  292. "-vcodec",
  293. "mjpeg",
  294. "-q:v",
  295. "2",
  296. "-",
  297. ]
  298. try:
  299. print(f"[EXT-CAM] Running ffmpeg command: {' '.join(cmd[:6])}...")
  300. process = await asyncio.create_subprocess_exec(
  301. *cmd,
  302. stdout=asyncio.subprocess.PIPE,
  303. stderr=asyncio.subprocess.PIPE,
  304. )
  305. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  306. print(
  307. f"[EXT-CAM] ffmpeg returned: code={process.returncode}, stdout={len(stdout)} bytes, stderr={len(stderr)} bytes"
  308. )
  309. if process.returncode != 0:
  310. logger.error("ffmpeg RTSP capture failed: %s", stderr.decode()[:200])
  311. print(f"[EXT-CAM] ffmpeg error: {stderr.decode()[:300]}")
  312. return None
  313. if not stdout or len(stdout) < 100:
  314. logger.error("ffmpeg returned empty or too small frame")
  315. return None
  316. return stdout
  317. except TimeoutError:
  318. logger.warning("RTSP frame capture timed out after %ss", timeout)
  319. if process:
  320. process.kill()
  321. return None
  322. except OSError as e:
  323. logger.error("RTSP frame capture failed: %s", e)
  324. return None
  325. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  326. """Fetch snapshot from HTTP URL.
  327. Note: This function intentionally makes requests to user-configured URLs.
  328. External camera support requires connecting to user-specified camera endpoints.
  329. URL is sanitized and dangerous destinations are blocked.
  330. """
  331. # Sanitize URL - returns reconstructed URL from validated components
  332. safe_url = _sanitize_camera_url(url, ("http", "https"))
  333. if not safe_url:
  334. logger.error("Invalid snapshot URL format: %s...", url[:50])
  335. return None
  336. try:
  337. async with (
  338. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  339. session.get(safe_url) as response,
  340. ):
  341. if response.status != 200:
  342. logger.error("Snapshot URL returned status %s", response.status)
  343. return None
  344. data = await response.read()
  345. # Validate it looks like JPEG
  346. if not data.startswith(b"\xff\xd8"):
  347. logger.warning("Snapshot does not appear to be JPEG")
  348. # Still return it - might be valid with different header
  349. return data
  350. except TimeoutError:
  351. logger.warning("Snapshot capture timed out after %ss", timeout)
  352. return None
  353. except (aiohttp.ClientError, OSError) as e:
  354. logger.error("Snapshot capture failed: %s", e)
  355. return None
  356. async def test_connection(url: str, camera_type: str) -> dict:
  357. """Test camera connection.
  358. Returns:
  359. Dict with {success: bool, error?: str, resolution?: str}
  360. """
  361. print(f"[EXT-CAM] Testing camera connection: type={camera_type}, url={url[:50]}...")
  362. logger.info("Testing camera connection: type=%s, url=%s...", camera_type, url[:50])
  363. try:
  364. frame = await capture_frame(url, camera_type, timeout=10)
  365. print(f"[EXT-CAM] Capture result: {len(frame) if frame else 0} bytes")
  366. logger.info("Capture result: %s bytes", len(frame) if frame else 0)
  367. if frame:
  368. # Try to get resolution from JPEG header
  369. resolution = None
  370. try:
  371. # Simple JPEG dimension extraction
  372. # SOF0 marker is FF C0, followed by length, precision, height, width
  373. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  374. for marker in sof_markers:
  375. idx = frame.find(marker)
  376. if idx != -1 and idx + 9 <= len(frame):
  377. height = (frame[idx + 5] << 8) | frame[idx + 6]
  378. width = (frame[idx + 7] << 8) | frame[idx + 8]
  379. resolution = f"{width}x{height}"
  380. break
  381. except (IndexError, ValueError):
  382. pass # Resolution detection is optional; fall back to default
  383. return {"success": True, "resolution": resolution}
  384. else:
  385. return {"success": False, "error": "Failed to capture frame from camera"}
  386. except Exception as e:
  387. # Sanitize error message - don't expose internal details
  388. error_type = type(e).__name__
  389. logger.error("Camera connection test failed: %s", e)
  390. return {"success": False, "error": f"Connection failed: {error_type}"}
  391. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  392. """Generator yielding MJPEG frames for streaming.
  393. Args:
  394. url: Camera URL or USB device path
  395. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  396. fps: Target frames per second
  397. Yields:
  398. MJPEG frame data with HTTP multipart boundaries
  399. """
  400. frame_interval = 1.0 / max(fps, 1)
  401. last_frame_time = 0.0
  402. if camera_type == "mjpeg":
  403. # Proxy MJPEG stream directly
  404. async for frame in _stream_mjpeg(url):
  405. current_time = asyncio.get_event_loop().time()
  406. if current_time - last_frame_time >= frame_interval:
  407. last_frame_time = current_time
  408. yield _format_mjpeg_frame(frame)
  409. elif camera_type == "rtsp":
  410. # Use ffmpeg to convert RTSP to MJPEG
  411. async for frame in _stream_rtsp(url, fps):
  412. yield _format_mjpeg_frame(frame)
  413. elif camera_type == "usb":
  414. # Use ffmpeg to stream from USB camera
  415. async for frame in _stream_usb(url, fps):
  416. yield _format_mjpeg_frame(frame)
  417. elif camera_type == "snapshot":
  418. # Poll snapshot URL at interval
  419. while True:
  420. try:
  421. frame = await _capture_snapshot(url, timeout=10)
  422. if frame:
  423. yield _format_mjpeg_frame(frame)
  424. await asyncio.sleep(frame_interval)
  425. except asyncio.CancelledError:
  426. break
  427. except (aiohttp.ClientError, OSError) as e:
  428. logger.warning("Snapshot poll failed: %s", e)
  429. await asyncio.sleep(frame_interval)
  430. def _format_mjpeg_frame(frame: bytes) -> bytes:
  431. """Format frame for MJPEG HTTP response."""
  432. return (
  433. b"--frame\r\n"
  434. b"Content-Type: image/jpeg\r\n"
  435. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  436. b"\r\n" + frame + b"\r\n"
  437. )
  438. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  439. """Stream frames from MJPEG URL.
  440. Note: This function intentionally makes requests to user-configured URLs.
  441. External camera support requires connecting to user-specified camera endpoints.
  442. URL is sanitized and dangerous destinations are blocked.
  443. """
  444. # Sanitize URL - returns reconstructed URL from validated components
  445. safe_url = _sanitize_camera_url(url, ("http", "https"))
  446. if not safe_url:
  447. logger.error("Invalid MJPEG stream URL: %s...", url[:50])
  448. return
  449. try:
  450. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  451. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(safe_url) as response:
  452. if response.status != 200:
  453. logger.error("MJPEG stream returned status %s", response.status)
  454. return
  455. buffer = b""
  456. jpeg_start = b"\xff\xd8"
  457. jpeg_end = b"\xff\xd9"
  458. async for chunk in response.content.iter_chunked(8192):
  459. buffer += chunk
  460. # Extract complete frames from buffer
  461. while True:
  462. start_idx = buffer.find(jpeg_start)
  463. if start_idx == -1:
  464. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  465. break
  466. if start_idx > 0:
  467. buffer = buffer[start_idx:]
  468. end_idx = buffer.find(jpeg_end, 2)
  469. if end_idx == -1:
  470. break
  471. frame = buffer[: end_idx + 2]
  472. buffer = buffer[end_idx + 2 :]
  473. yield frame
  474. except asyncio.CancelledError:
  475. logger.info("MJPEG stream cancelled")
  476. except (aiohttp.ClientError, OSError) as e:
  477. logger.error("MJPEG stream error: %s", e)
  478. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  479. """Stream frames from RTSP URL via ffmpeg."""
  480. ffmpeg = get_ffmpeg_path()
  481. if not ffmpeg:
  482. logger.error("ffmpeg not found - required for RTSP streaming")
  483. return
  484. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  485. cmd = [
  486. ffmpeg,
  487. "-rtsp_transport",
  488. "tcp",
  489. "-rtsp_flags",
  490. "prefer_tcp",
  491. "-timeout",
  492. "30000000",
  493. "-buffer_size",
  494. "1024000",
  495. "-max_delay",
  496. "500000",
  497. "-i",
  498. url,
  499. "-f",
  500. "mjpeg",
  501. "-q:v",
  502. "5",
  503. "-r",
  504. str(fps),
  505. "-an",
  506. "-",
  507. ]
  508. process = None
  509. try:
  510. process = await asyncio.create_subprocess_exec(
  511. *cmd,
  512. stdout=asyncio.subprocess.PIPE,
  513. stderr=asyncio.subprocess.PIPE,
  514. )
  515. # Give ffmpeg a moment to start and check for immediate failures
  516. await asyncio.sleep(0.5)
  517. if process.returncode is not None:
  518. stderr = await process.stderr.read()
  519. logger.error("ffmpeg RTSP stream failed immediately: %s", stderr.decode()[:300])
  520. return
  521. buffer = b""
  522. jpeg_start = b"\xff\xd8"
  523. jpeg_end = b"\xff\xd9"
  524. while True:
  525. try:
  526. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  527. if not chunk:
  528. break
  529. buffer += chunk
  530. # Extract complete frames
  531. while True:
  532. start_idx = buffer.find(jpeg_start)
  533. if start_idx == -1:
  534. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  535. break
  536. if start_idx > 0:
  537. buffer = buffer[start_idx:]
  538. end_idx = buffer.find(jpeg_end, 2)
  539. if end_idx == -1:
  540. break
  541. frame = buffer[: end_idx + 2]
  542. buffer = buffer[end_idx + 2 :]
  543. yield frame
  544. except TimeoutError:
  545. logger.warning("RTSP stream read timeout")
  546. break
  547. except asyncio.CancelledError:
  548. logger.info("RTSP stream cancelled")
  549. except OSError as e:
  550. logger.error("RTSP stream error: %s", e)
  551. finally:
  552. if process and process.returncode is None:
  553. process.terminate()
  554. try:
  555. await asyncio.wait_for(process.wait(), timeout=2.0)
  556. except TimeoutError:
  557. process.kill()
  558. await process.wait()
  559. async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
  560. """Stream frames from USB camera via ffmpeg."""
  561. ffmpeg = get_ffmpeg_path()
  562. if not ffmpeg:
  563. logger.error("ffmpeg not found - required for USB camera streaming")
  564. return
  565. # Validate device path
  566. if not device.startswith("/dev/video"):
  567. logger.error("Invalid USB device path: %s", device)
  568. return
  569. if not Path(device).exists():
  570. logger.error("USB device does not exist: %s", device)
  571. return
  572. # ffmpeg command to stream from USB camera (v4l2)
  573. cmd = [
  574. ffmpeg,
  575. "-f",
  576. "v4l2",
  577. "-framerate",
  578. str(fps),
  579. "-i",
  580. device,
  581. "-f",
  582. "mjpeg",
  583. "-q:v",
  584. "5",
  585. "-r",
  586. str(fps),
  587. "-",
  588. ]
  589. process = None
  590. try:
  591. logger.info("Starting USB camera stream from %s at %s fps", device, fps)
  592. process = await asyncio.create_subprocess_exec(
  593. *cmd,
  594. stdout=asyncio.subprocess.PIPE,
  595. stderr=asyncio.subprocess.PIPE,
  596. )
  597. # Give ffmpeg a moment to start and check for immediate failures
  598. await asyncio.sleep(0.5)
  599. if process.returncode is not None:
  600. stderr = await process.stderr.read()
  601. logger.error("ffmpeg USB stream failed immediately: %s", stderr.decode()[:300])
  602. return
  603. buffer = b""
  604. jpeg_start = b"\xff\xd8"
  605. jpeg_end = b"\xff\xd9"
  606. while True:
  607. try:
  608. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  609. if not chunk:
  610. break
  611. buffer += chunk
  612. # Extract complete frames
  613. while True:
  614. start_idx = buffer.find(jpeg_start)
  615. if start_idx == -1:
  616. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  617. break
  618. if start_idx > 0:
  619. buffer = buffer[start_idx:]
  620. end_idx = buffer.find(jpeg_end, 2)
  621. if end_idx == -1:
  622. break
  623. frame = buffer[: end_idx + 2]
  624. buffer = buffer[end_idx + 2 :]
  625. yield frame
  626. except TimeoutError:
  627. logger.warning("USB stream read timeout")
  628. break
  629. except asyncio.CancelledError:
  630. logger.info("USB stream cancelled")
  631. except OSError as e:
  632. logger.error("USB stream error: %s", e)
  633. finally:
  634. if process and process.returncode is None:
  635. process.terminate()
  636. try:
  637. await asyncio.wait_for(process.wait(), timeout=2.0)
  638. except TimeoutError:
  639. process.kill()
  640. await process.wait()