external_camera.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. """External camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
  3. Security Note: This service intentionally makes requests to user-configured camera URLs.
  4. This is necessary functionality for external camera integration. URLs are validated
  5. to ensure they are well-formed before use.
  6. """
  7. import asyncio
  8. import logging
  9. import re
  10. import shutil
  11. from collections.abc import AsyncGenerator
  12. from pathlib import Path
  13. from urllib.parse import urlparse
  14. import aiohttp
  15. logger = logging.getLogger(__name__)
  16. def _validate_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> bool:
  17. """Validate camera URL format and block dangerous destinations.
  18. This validates that the URL is well-formed, uses an allowed scheme,
  19. and does not target cloud metadata services.
  20. Note: This intentionally allows user-provided URLs as that is the
  21. purpose of external camera configuration. Local network IPs are
  22. allowed since cameras are typically on the same LAN.
  23. Args:
  24. url: URL to validate
  25. allowed_schemes: Tuple of allowed URL schemes
  26. Returns:
  27. True if URL is valid, False otherwise
  28. """
  29. try:
  30. parsed = urlparse(url)
  31. if not parsed.scheme or not parsed.netloc:
  32. return False
  33. if parsed.scheme.lower() not in allowed_schemes:
  34. return False
  35. # Block cloud metadata service endpoints (SSRF mitigation)
  36. # These are dangerous destinations that should never be accessed
  37. hostname = parsed.hostname or ""
  38. blocked_hosts = (
  39. "169.254.169.254", # AWS/GCP/Azure metadata
  40. "metadata.google.internal", # GCP metadata
  41. "metadata.google",
  42. "localhost", # Block localhost to prevent internal service access
  43. "127.0.0.1",
  44. "::1",
  45. "0.0.0.0",
  46. )
  47. if hostname.lower() in blocked_hosts:
  48. logger.warning(f"Blocked camera URL targeting restricted host: {hostname}")
  49. return False
  50. # Block link-local addresses (169.254.x.x)
  51. if hostname.startswith("169.254."):
  52. logger.warning(f"Blocked camera URL targeting link-local address: {hostname}")
  53. return False
  54. return True
  55. except Exception:
  56. return False
  57. def list_usb_cameras() -> list[dict]:
  58. """List available USB cameras (V4L2 devices on Linux).
  59. Returns:
  60. List of dicts with {device: str, name: str, capabilities: list}
  61. """
  62. cameras = []
  63. video_devices = sorted(Path("/dev").glob("video*"))
  64. for device in video_devices:
  65. device_path = str(device)
  66. info = {"device": device_path, "name": device.name, "capabilities": []}
  67. # Try to get device info via v4l2-ctl
  68. v4l2_ctl = shutil.which("v4l2-ctl")
  69. if v4l2_ctl:
  70. import subprocess
  71. try:
  72. result = subprocess.run(
  73. [v4l2_ctl, "-d", device_path, "--info"],
  74. capture_output=True,
  75. text=True,
  76. timeout=5,
  77. )
  78. if result.returncode == 0:
  79. # Parse device name from output
  80. for line in result.stdout.splitlines():
  81. if "Card type" in line:
  82. info["name"] = line.split(":", 1)[1].strip()
  83. elif "Driver name" in line:
  84. info["driver"] = line.split(":", 1)[1].strip()
  85. # Check if device supports video capture
  86. result = subprocess.run(
  87. [v4l2_ctl, "-d", device_path, "--list-formats"],
  88. capture_output=True,
  89. text=True,
  90. timeout=5,
  91. )
  92. if result.returncode == 0 and result.stdout.strip():
  93. info["capabilities"].append("capture")
  94. # Parse available formats
  95. formats = re.findall(r"'(\w+)'", result.stdout)
  96. info["formats"] = list(set(formats))
  97. except (subprocess.TimeoutExpired, Exception) as e:
  98. logger.debug(f"v4l2-ctl failed for {device_path}: {e}")
  99. # Only include devices that look like video capture devices
  100. # Skip metadata devices (typically odd numbered like video1, video3)
  101. try:
  102. device_num = int(device.name.replace("video", ""))
  103. # Even numbered devices are usually capture, odd are metadata
  104. # But also check if we got capabilities
  105. if info.get("capabilities") or device_num % 2 == 0:
  106. cameras.append(info)
  107. except ValueError:
  108. cameras.append(info)
  109. return cameras
  110. def get_ffmpeg_path() -> str | None:
  111. """Get the path to ffmpeg executable."""
  112. # Try shutil.which first
  113. path = shutil.which("ffmpeg")
  114. if path:
  115. return path
  116. # Check common locations (systemd services may have limited PATH)
  117. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  118. if Path(common_path).exists():
  119. return common_path
  120. return None
  121. async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
  122. """Capture single frame from external camera.
  123. Args:
  124. url: Camera URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path)
  125. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  126. timeout: Connection timeout in seconds
  127. Returns:
  128. JPEG bytes or None on failure
  129. """
  130. logger.debug(f"capture_frame called: type={camera_type}, url={url[:50] if url else 'None'}...")
  131. if camera_type == "mjpeg":
  132. return await _capture_mjpeg_frame(url, timeout)
  133. elif camera_type == "rtsp":
  134. return await _capture_rtsp_frame(url, timeout)
  135. elif camera_type == "snapshot":
  136. return await _capture_snapshot(url, timeout)
  137. elif camera_type == "usb":
  138. return await _capture_usb_frame(url, timeout)
  139. else:
  140. logger.warning(f"Unknown camera type: {camera_type}")
  141. return None
  142. async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
  143. """Capture frame from USB camera using ffmpeg."""
  144. ffmpeg = get_ffmpeg_path()
  145. if not ffmpeg:
  146. logger.error("ffmpeg not found - required for USB camera capture")
  147. return None
  148. # Validate device path - must be /dev/videoN format where N is 0-99
  149. # This prevents path traversal by using a strict allowlist approach
  150. import re as regex_module
  151. device_match = regex_module.match(r"^/dev/video(\d{1,2})$", device)
  152. if not device_match:
  153. logger.error(f"Invalid USB device path format: {device}")
  154. return None
  155. # Convert to integer to break taint chain - integers cannot contain path traversal
  156. # lgtm[py/path-injection] - device_num is validated integer 0-99
  157. device_num = int(device_match.group(1)) # Safe: regex guarantees 1-2 digits
  158. if device_num > 99:
  159. logger.error(f"USB device number out of range: {device_num}")
  160. return None
  161. # Construct safe path from validated integer (completely untainted)
  162. safe_device_path = Path(f"/dev/video{device_num}") # lgtm[py/path-injection]
  163. if not safe_device_path.exists():
  164. logger.error(f"USB device does not exist: {safe_device_path}")
  165. return None
  166. # Use the safe path for ffmpeg - this is a hardcoded /dev/videoN path
  167. device = str(safe_device_path) # lgtm[py/path-injection]
  168. # Use ffmpeg to grab a single frame from USB camera
  169. cmd = [
  170. ffmpeg,
  171. "-f",
  172. "v4l2",
  173. "-i",
  174. device,
  175. "-frames:v",
  176. "1",
  177. "-f",
  178. "image2pipe",
  179. "-vcodec",
  180. "mjpeg",
  181. "-q:v",
  182. "2",
  183. "-",
  184. ]
  185. try:
  186. logger.debug(f"Running USB capture: {' '.join(cmd)}")
  187. process = await asyncio.create_subprocess_exec(
  188. *cmd,
  189. stdout=asyncio.subprocess.PIPE,
  190. stderr=asyncio.subprocess.PIPE,
  191. )
  192. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  193. if process.returncode != 0:
  194. logger.error(f"ffmpeg USB capture failed: {stderr.decode()[:200]}")
  195. return None
  196. if not stdout or len(stdout) < 100:
  197. logger.error("ffmpeg returned empty or too small frame from USB camera")
  198. return None
  199. return stdout
  200. except TimeoutError:
  201. logger.warning(f"USB frame capture timed out after {timeout}s")
  202. if process:
  203. process.kill()
  204. return None
  205. except Exception as e:
  206. logger.error(f"USB frame capture failed: {e}")
  207. return None
  208. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  209. """Extract single frame from MJPEG stream.
  210. Note: This function intentionally makes requests to user-configured URLs.
  211. External camera support requires connecting to user-specified camera endpoints.
  212. URL format is validated but the destination is intentionally user-controlled.
  213. """
  214. # Validate URL format (user-configured camera URL - intentional external request)
  215. if not _validate_camera_url(url, ("http", "https")):
  216. logger.error(f"Invalid MJPEG URL format: {url[:50]}...")
  217. return None
  218. try:
  219. # Intentional SSRF: External camera URLs are user-configured by design
  220. async with (
  221. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  222. session.get(url) as response, # nosec lgtm[py/ssrf]
  223. ):
  224. if response.status != 200:
  225. logger.error(f"MJPEG stream returned status {response.status}")
  226. return None
  227. # Read chunks until we find a complete JPEG frame
  228. buffer = b""
  229. jpeg_start = b"\xff\xd8"
  230. jpeg_end = b"\xff\xd9"
  231. async for chunk in response.content.iter_chunked(8192):
  232. buffer += chunk
  233. # Look for complete JPEG frame
  234. start_idx = buffer.find(jpeg_start)
  235. if start_idx == -1:
  236. continue
  237. end_idx = buffer.find(jpeg_end, start_idx + 2)
  238. if end_idx != -1:
  239. # Found complete frame
  240. frame = buffer[start_idx : end_idx + 2]
  241. return frame
  242. # Keep searching, but limit buffer size
  243. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  244. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  245. return None
  246. except TimeoutError:
  247. logger.warning(f"MJPEG frame capture timed out after {timeout}s")
  248. return None
  249. except Exception as e:
  250. logger.error(f"MJPEG frame capture failed: {e}")
  251. return None
  252. return None
  253. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  254. """Capture frame from RTSP using ffmpeg."""
  255. ffmpeg = get_ffmpeg_path()
  256. if not ffmpeg:
  257. logger.error("ffmpeg not found - required for RTSP capture")
  258. return None
  259. # Use ffmpeg to grab a single frame from RTSP stream
  260. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  261. cmd = [
  262. ffmpeg,
  263. "-rtsp_transport",
  264. "tcp",
  265. "-i",
  266. url,
  267. "-frames:v",
  268. "1",
  269. "-f",
  270. "image2pipe",
  271. "-vcodec",
  272. "mjpeg",
  273. "-q:v",
  274. "2",
  275. "-",
  276. ]
  277. try:
  278. print(f"[EXT-CAM] Running ffmpeg command: {' '.join(cmd[:6])}...")
  279. process = await asyncio.create_subprocess_exec(
  280. *cmd,
  281. stdout=asyncio.subprocess.PIPE,
  282. stderr=asyncio.subprocess.PIPE,
  283. )
  284. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  285. print(
  286. f"[EXT-CAM] ffmpeg returned: code={process.returncode}, stdout={len(stdout)} bytes, stderr={len(stderr)} bytes"
  287. )
  288. if process.returncode != 0:
  289. logger.error(f"ffmpeg RTSP capture failed: {stderr.decode()[:200]}")
  290. print(f"[EXT-CAM] ffmpeg error: {stderr.decode()[:300]}")
  291. return None
  292. if not stdout or len(stdout) < 100:
  293. logger.error("ffmpeg returned empty or too small frame")
  294. return None
  295. return stdout
  296. except TimeoutError:
  297. logger.warning(f"RTSP frame capture timed out after {timeout}s")
  298. if process:
  299. process.kill()
  300. return None
  301. except Exception as e:
  302. logger.error(f"RTSP frame capture failed: {e}")
  303. return None
  304. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  305. """Fetch snapshot from HTTP URL.
  306. Note: This function intentionally makes requests to user-configured URLs.
  307. External camera support requires connecting to user-specified camera endpoints.
  308. URL format is validated but the destination is intentionally user-controlled.
  309. """
  310. # Validate URL format (user-configured camera URL - intentional external request)
  311. if not _validate_camera_url(url, ("http", "https")):
  312. logger.error(f"Invalid snapshot URL format: {url[:50]}...")
  313. return None
  314. try:
  315. # Intentional SSRF: External camera URLs are user-configured by design
  316. async with (
  317. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  318. session.get(url) as response, # nosec lgtm[py/ssrf]
  319. ):
  320. if response.status != 200:
  321. logger.error(f"Snapshot URL returned status {response.status}")
  322. return None
  323. data = await response.read()
  324. # Validate it looks like JPEG
  325. if not data.startswith(b"\xff\xd8"):
  326. logger.warning("Snapshot does not appear to be JPEG")
  327. # Still return it - might be valid with different header
  328. return data
  329. except TimeoutError:
  330. logger.warning(f"Snapshot capture timed out after {timeout}s")
  331. return None
  332. except Exception as e:
  333. logger.error(f"Snapshot capture failed: {e}")
  334. return None
  335. async def test_connection(url: str, camera_type: str) -> dict:
  336. """Test camera connection.
  337. Returns:
  338. Dict with {success: bool, error?: str, resolution?: str}
  339. """
  340. print(f"[EXT-CAM] Testing camera connection: type={camera_type}, url={url[:50]}...")
  341. logger.info(f"Testing camera connection: type={camera_type}, url={url[:50]}...")
  342. try:
  343. frame = await capture_frame(url, camera_type, timeout=10)
  344. print(f"[EXT-CAM] Capture result: {len(frame) if frame else 0} bytes")
  345. logger.info(f"Capture result: {len(frame) if frame else 0} bytes")
  346. if frame:
  347. # Try to get resolution from JPEG header
  348. resolution = None
  349. try:
  350. # Simple JPEG dimension extraction
  351. # SOF0 marker is FF C0, followed by length, precision, height, width
  352. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  353. for marker in sof_markers:
  354. idx = frame.find(marker)
  355. if idx != -1 and idx + 9 <= len(frame):
  356. height = (frame[idx + 5] << 8) | frame[idx + 6]
  357. width = (frame[idx + 7] << 8) | frame[idx + 8]
  358. resolution = f"{width}x{height}"
  359. break
  360. except Exception:
  361. pass
  362. return {"success": True, "resolution": resolution}
  363. else:
  364. return {"success": False, "error": "Failed to capture frame from camera"}
  365. except Exception as e:
  366. # Sanitize error message - don't expose internal details
  367. error_type = type(e).__name__
  368. logger.error(f"Camera connection test failed: {e}")
  369. return {"success": False, "error": f"Connection failed: {error_type}"}
  370. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  371. """Generator yielding MJPEG frames for streaming.
  372. Args:
  373. url: Camera URL or USB device path
  374. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  375. fps: Target frames per second
  376. Yields:
  377. MJPEG frame data with HTTP multipart boundaries
  378. """
  379. frame_interval = 1.0 / max(fps, 1)
  380. last_frame_time = 0.0
  381. if camera_type == "mjpeg":
  382. # Proxy MJPEG stream directly
  383. async for frame in _stream_mjpeg(url):
  384. current_time = asyncio.get_event_loop().time()
  385. if current_time - last_frame_time >= frame_interval:
  386. last_frame_time = current_time
  387. yield _format_mjpeg_frame(frame)
  388. elif camera_type == "rtsp":
  389. # Use ffmpeg to convert RTSP to MJPEG
  390. async for frame in _stream_rtsp(url, fps):
  391. yield _format_mjpeg_frame(frame)
  392. elif camera_type == "usb":
  393. # Use ffmpeg to stream from USB camera
  394. async for frame in _stream_usb(url, fps):
  395. yield _format_mjpeg_frame(frame)
  396. elif camera_type == "snapshot":
  397. # Poll snapshot URL at interval
  398. while True:
  399. try:
  400. frame = await _capture_snapshot(url, timeout=10)
  401. if frame:
  402. yield _format_mjpeg_frame(frame)
  403. await asyncio.sleep(frame_interval)
  404. except asyncio.CancelledError:
  405. break
  406. except Exception as e:
  407. logger.warning(f"Snapshot poll failed: {e}")
  408. await asyncio.sleep(frame_interval)
  409. def _format_mjpeg_frame(frame: bytes) -> bytes:
  410. """Format frame for MJPEG HTTP response."""
  411. return (
  412. b"--frame\r\n"
  413. b"Content-Type: image/jpeg\r\n"
  414. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  415. b"\r\n" + frame + b"\r\n"
  416. )
  417. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  418. """Stream frames from MJPEG URL.
  419. Note: This function intentionally makes requests to user-configured URLs.
  420. External camera support requires connecting to user-specified camera endpoints.
  421. """
  422. try:
  423. # Intentional SSRF: External camera URLs are user-configured by design
  424. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  425. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(url) as response: # nosec lgtm[py/ssrf]
  426. if response.status != 200:
  427. logger.error(f"MJPEG stream returned status {response.status}")
  428. return
  429. buffer = b""
  430. jpeg_start = b"\xff\xd8"
  431. jpeg_end = b"\xff\xd9"
  432. async for chunk in response.content.iter_chunked(8192):
  433. buffer += chunk
  434. # Extract complete frames from buffer
  435. while True:
  436. start_idx = buffer.find(jpeg_start)
  437. if start_idx == -1:
  438. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  439. break
  440. if start_idx > 0:
  441. buffer = buffer[start_idx:]
  442. end_idx = buffer.find(jpeg_end, 2)
  443. if end_idx == -1:
  444. break
  445. frame = buffer[: end_idx + 2]
  446. buffer = buffer[end_idx + 2 :]
  447. yield frame
  448. except asyncio.CancelledError:
  449. logger.info("MJPEG stream cancelled")
  450. except Exception as e:
  451. logger.error(f"MJPEG stream error: {e}")
  452. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  453. """Stream frames from RTSP URL via ffmpeg."""
  454. ffmpeg = get_ffmpeg_path()
  455. if not ffmpeg:
  456. logger.error("ffmpeg not found - required for RTSP streaming")
  457. return
  458. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  459. cmd = [
  460. ffmpeg,
  461. "-rtsp_transport",
  462. "tcp",
  463. "-rtsp_flags",
  464. "prefer_tcp",
  465. "-timeout",
  466. "30000000",
  467. "-buffer_size",
  468. "1024000",
  469. "-max_delay",
  470. "500000",
  471. "-i",
  472. url,
  473. "-f",
  474. "mjpeg",
  475. "-q:v",
  476. "5",
  477. "-r",
  478. str(fps),
  479. "-an",
  480. "-",
  481. ]
  482. process = None
  483. try:
  484. process = await asyncio.create_subprocess_exec(
  485. *cmd,
  486. stdout=asyncio.subprocess.PIPE,
  487. stderr=asyncio.subprocess.PIPE,
  488. )
  489. # Give ffmpeg a moment to start and check for immediate failures
  490. await asyncio.sleep(0.5)
  491. if process.returncode is not None:
  492. stderr = await process.stderr.read()
  493. logger.error(f"ffmpeg RTSP stream failed immediately: {stderr.decode()[:300]}")
  494. return
  495. buffer = b""
  496. jpeg_start = b"\xff\xd8"
  497. jpeg_end = b"\xff\xd9"
  498. while True:
  499. try:
  500. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  501. if not chunk:
  502. break
  503. buffer += chunk
  504. # Extract complete frames
  505. while True:
  506. start_idx = buffer.find(jpeg_start)
  507. if start_idx == -1:
  508. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  509. break
  510. if start_idx > 0:
  511. buffer = buffer[start_idx:]
  512. end_idx = buffer.find(jpeg_end, 2)
  513. if end_idx == -1:
  514. break
  515. frame = buffer[: end_idx + 2]
  516. buffer = buffer[end_idx + 2 :]
  517. yield frame
  518. except TimeoutError:
  519. logger.warning("RTSP stream read timeout")
  520. break
  521. except asyncio.CancelledError:
  522. logger.info("RTSP stream cancelled")
  523. except Exception as e:
  524. logger.error(f"RTSP stream error: {e}")
  525. finally:
  526. if process and process.returncode is None:
  527. process.terminate()
  528. try:
  529. await asyncio.wait_for(process.wait(), timeout=2.0)
  530. except TimeoutError:
  531. process.kill()
  532. await process.wait()
  533. async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
  534. """Stream frames from USB camera via ffmpeg."""
  535. ffmpeg = get_ffmpeg_path()
  536. if not ffmpeg:
  537. logger.error("ffmpeg not found - required for USB camera streaming")
  538. return
  539. # Validate device path
  540. if not device.startswith("/dev/video"):
  541. logger.error(f"Invalid USB device path: {device}")
  542. return
  543. if not Path(device).exists():
  544. logger.error(f"USB device does not exist: {device}")
  545. return
  546. # ffmpeg command to stream from USB camera (v4l2)
  547. cmd = [
  548. ffmpeg,
  549. "-f",
  550. "v4l2",
  551. "-framerate",
  552. str(fps),
  553. "-i",
  554. device,
  555. "-f",
  556. "mjpeg",
  557. "-q:v",
  558. "5",
  559. "-r",
  560. str(fps),
  561. "-",
  562. ]
  563. process = None
  564. try:
  565. logger.info(f"Starting USB camera stream from {device} at {fps} fps")
  566. process = await asyncio.create_subprocess_exec(
  567. *cmd,
  568. stdout=asyncio.subprocess.PIPE,
  569. stderr=asyncio.subprocess.PIPE,
  570. )
  571. # Give ffmpeg a moment to start and check for immediate failures
  572. await asyncio.sleep(0.5)
  573. if process.returncode is not None:
  574. stderr = await process.stderr.read()
  575. logger.error(f"ffmpeg USB stream failed immediately: {stderr.decode()[:300]}")
  576. return
  577. buffer = b""
  578. jpeg_start = b"\xff\xd8"
  579. jpeg_end = b"\xff\xd9"
  580. while True:
  581. try:
  582. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  583. if not chunk:
  584. break
  585. buffer += chunk
  586. # Extract complete frames
  587. while True:
  588. start_idx = buffer.find(jpeg_start)
  589. if start_idx == -1:
  590. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  591. break
  592. if start_idx > 0:
  593. buffer = buffer[start_idx:]
  594. end_idx = buffer.find(jpeg_end, 2)
  595. if end_idx == -1:
  596. break
  597. frame = buffer[: end_idx + 2]
  598. buffer = buffer[end_idx + 2 :]
  599. yield frame
  600. except TimeoutError:
  601. logger.warning("USB stream read timeout")
  602. break
  603. except asyncio.CancelledError:
  604. logger.info("USB stream cancelled")
  605. except Exception as e:
  606. logger.error(f"USB stream error: {e}")
  607. finally:
  608. if process and process.returncode is None:
  609. process.terminate()
  610. try:
  611. await asyncio.wait_for(process.wait(), timeout=2.0)
  612. except TimeoutError:
  613. process.kill()
  614. await process.wait()