external_camera.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. """External camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
  3. Security Note: This service intentionally makes requests to user-configured camera URLs.
  4. This is necessary functionality for external camera integration. URLs are validated
  5. to ensure they are well-formed before use.
  6. """
  7. import asyncio
  8. import logging
  9. import re
  10. import shutil
  11. from collections.abc import AsyncGenerator
  12. from pathlib import Path
  13. from urllib.parse import urlparse
  14. import aiohttp
  15. logger = logging.getLogger(__name__)
  16. def _sanitize_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> str | None:
  17. """Validate and sanitize camera URL, returning a safe reconstructed URL.
  18. This validates that the URL is well-formed, uses an allowed scheme,
  19. does not target cloud metadata services, and returns a reconstructed
  20. URL from validated components.
  21. Note: This intentionally allows user-provided URLs as that is the
  22. purpose of external camera configuration. Local network IPs are
  23. allowed since cameras are typically on the same LAN.
  24. Args:
  25. url: URL to validate and sanitize
  26. allowed_schemes: Tuple of allowed URL schemes
  27. Returns:
  28. Sanitized URL string if valid, None otherwise
  29. """
  30. try:
  31. parsed = urlparse(url)
  32. if not parsed.scheme or not parsed.netloc:
  33. return None
  34. # Validate scheme against allowlist
  35. scheme = parsed.scheme.lower()
  36. if scheme not in allowed_schemes:
  37. return None
  38. # Block cloud metadata service endpoints (SSRF mitigation)
  39. # These are dangerous destinations that should never be accessed
  40. hostname = parsed.hostname or ""
  41. hostname_lower = hostname.lower()
  42. blocked_hosts = (
  43. "169.254.169.254", # AWS/GCP/Azure metadata
  44. "metadata.google.internal", # GCP metadata
  45. "metadata.google",
  46. "localhost", # Block localhost to prevent internal service access
  47. "127.0.0.1",
  48. "::1",
  49. "0.0.0.0", # nosec B104
  50. )
  51. if hostname_lower in blocked_hosts:
  52. logger.warning("Blocked camera URL targeting restricted host: %s", hostname)
  53. return None
  54. # Block link-local addresses (169.254.x.x)
  55. if hostname.startswith("169.254."):
  56. logger.warning("Blocked camera URL targeting link-local address: %s", hostname)
  57. return None
  58. # Reconstruct URL from validated components to break taint chain
  59. # This creates a new string from validated parts
  60. port_str = f":{parsed.port}" if parsed.port else ""
  61. path = parsed.path or ""
  62. query = f"?{parsed.query}" if parsed.query else ""
  63. fragment = f"#{parsed.fragment}" if parsed.fragment else ""
  64. # Build sanitized URL from validated components
  65. sanitized = f"{scheme}://{hostname}{port_str}{path}{query}{fragment}"
  66. return sanitized
  67. except ValueError:
  68. return None
  69. def _validate_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> bool:
  70. """Validate camera URL format (legacy wrapper).
  71. Args:
  72. url: URL to validate
  73. allowed_schemes: Tuple of allowed URL schemes
  74. Returns:
  75. True if URL is valid, False otherwise
  76. """
  77. return _sanitize_camera_url(url, allowed_schemes) is not None
  78. def list_usb_cameras() -> list[dict]:
  79. """List available USB cameras (V4L2 devices on Linux).
  80. Returns:
  81. List of dicts with {device: str, name: str, capabilities: list}
  82. """
  83. cameras = []
  84. video_devices = sorted(Path("/dev").glob("video*"))
  85. for device in video_devices:
  86. device_path = str(device)
  87. info = {"device": device_path, "name": device.name, "capabilities": []}
  88. # Try to get device info via v4l2-ctl
  89. v4l2_ctl = shutil.which("v4l2-ctl")
  90. if v4l2_ctl:
  91. import subprocess
  92. try:
  93. result = subprocess.run(
  94. [v4l2_ctl, "-d", device_path, "--info"],
  95. capture_output=True,
  96. text=True,
  97. timeout=5,
  98. )
  99. if result.returncode == 0:
  100. # Parse device name from output
  101. for line in result.stdout.splitlines():
  102. if "Card type" in line:
  103. info["name"] = line.split(":", 1)[1].strip()
  104. elif "Driver name" in line:
  105. info["driver"] = line.split(":", 1)[1].strip()
  106. # Check if device supports video capture
  107. result = subprocess.run(
  108. [v4l2_ctl, "-d", device_path, "--list-formats"],
  109. capture_output=True,
  110. text=True,
  111. timeout=5,
  112. )
  113. if result.returncode == 0 and result.stdout.strip():
  114. info["capabilities"].append("capture")
  115. # Parse available formats
  116. formats = re.findall(r"'(\w+)'", result.stdout)
  117. info["formats"] = list(set(formats))
  118. except (subprocess.TimeoutExpired, Exception) as e:
  119. logger.debug("v4l2-ctl failed for %s: %s", device_path, e)
  120. # Only include devices that look like video capture devices
  121. # Skip metadata devices (typically odd numbered like video1, video3)
  122. try:
  123. device_num = int(device.name.replace("video", ""))
  124. # Even numbered devices are usually capture, odd are metadata
  125. # But also check if we got capabilities
  126. if info.get("capabilities") or device_num % 2 == 0:
  127. cameras.append(info)
  128. except ValueError:
  129. cameras.append(info)
  130. return cameras
  131. def get_ffmpeg_path() -> str | None:
  132. """Get the path to ffmpeg executable."""
  133. # Try shutil.which first
  134. path = shutil.which("ffmpeg")
  135. if path:
  136. return path
  137. # Check common locations (systemd services may have limited PATH)
  138. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  139. if Path(common_path).exists():
  140. return common_path
  141. return None
  142. async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
  143. """Capture single frame from external camera.
  144. Args:
  145. url: Camera URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path)
  146. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  147. timeout: Connection timeout in seconds
  148. Returns:
  149. JPEG bytes or None on failure
  150. """
  151. logger.debug("capture_frame called: type=%s, url=%s...", camera_type, url[:50] if url else "None")
  152. if camera_type == "mjpeg":
  153. return await _capture_mjpeg_frame(url, timeout)
  154. elif camera_type == "rtsp":
  155. return await _capture_rtsp_frame(url, timeout)
  156. elif camera_type == "snapshot":
  157. return await _capture_snapshot(url, timeout)
  158. elif camera_type == "usb":
  159. return await _capture_usb_frame(url, timeout)
  160. else:
  161. logger.warning("Unknown camera type: %s", camera_type)
  162. return None
  163. async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
  164. """Capture frame from USB camera using ffmpeg."""
  165. ffmpeg = get_ffmpeg_path()
  166. if not ffmpeg:
  167. logger.error("ffmpeg not found - required for USB camera capture")
  168. return None
  169. # Validate device path - must be /dev/videoN format where N is 0-99
  170. # This prevents path traversal by using a strict allowlist approach
  171. import re as regex_module
  172. device_match = regex_module.match(r"^/dev/video(\d{1,2})$", device)
  173. if not device_match:
  174. logger.error("Invalid USB device path format: %s", device)
  175. return None
  176. # Convert to integer to break taint chain - integers cannot contain path traversal
  177. # lgtm[py/path-injection] - device_num is validated integer 0-99
  178. device_num = int(device_match.group(1)) # Safe: regex guarantees 1-2 digits
  179. if device_num > 99:
  180. logger.error("USB device number out of range: %s", device_num)
  181. return None
  182. # Construct safe path from validated integer (completely untainted)
  183. safe_device_path = Path(f"/dev/video{device_num}") # lgtm[py/path-injection]
  184. if not safe_device_path.exists():
  185. logger.error("USB device does not exist: %s", safe_device_path)
  186. return None
  187. # Use the safe path for ffmpeg - this is a hardcoded /dev/videoN path
  188. device = str(safe_device_path) # lgtm[py/path-injection]
  189. # Use ffmpeg to grab a single frame from USB camera
  190. cmd = [
  191. ffmpeg,
  192. "-f",
  193. "v4l2",
  194. "-i",
  195. device,
  196. "-frames:v",
  197. "1",
  198. "-f",
  199. "image2pipe",
  200. "-vcodec",
  201. "mjpeg",
  202. "-q:v",
  203. "2",
  204. "-",
  205. ]
  206. try:
  207. logger.debug("Running USB capture: %s", " ".join(cmd))
  208. process = await asyncio.create_subprocess_exec(
  209. *cmd,
  210. stdout=asyncio.subprocess.PIPE,
  211. stderr=asyncio.subprocess.PIPE,
  212. )
  213. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  214. if process.returncode != 0:
  215. logger.error("ffmpeg USB capture failed: %s", stderr.decode()[:200])
  216. return None
  217. if not stdout or len(stdout) < 100:
  218. logger.error("ffmpeg returned empty or too small frame from USB camera")
  219. return None
  220. return stdout
  221. except TimeoutError:
  222. logger.warning("USB frame capture timed out after %ss", timeout)
  223. if process:
  224. process.kill()
  225. return None
  226. except OSError as e:
  227. logger.error("USB frame capture failed: %s", e)
  228. return None
  229. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  230. """Extract single frame from MJPEG stream.
  231. Note: This function intentionally makes requests to user-configured URLs.
  232. External camera support requires connecting to user-specified camera endpoints.
  233. URL is sanitized and dangerous destinations are blocked.
  234. """
  235. # Sanitize URL - returns reconstructed URL from validated components
  236. safe_url = _sanitize_camera_url(url, ("http", "https"))
  237. if not safe_url:
  238. logger.error("Invalid MJPEG URL format: %s...", url[:50])
  239. return None
  240. try:
  241. async with (
  242. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  243. session.get(safe_url) as response,
  244. ):
  245. if response.status != 200:
  246. logger.error("MJPEG stream returned status %s", response.status)
  247. return None
  248. # Read chunks until we find a complete JPEG frame
  249. buffer = b""
  250. jpeg_start = b"\xff\xd8"
  251. jpeg_end = b"\xff\xd9"
  252. async for chunk in response.content.iter_chunked(8192):
  253. buffer += chunk
  254. # Look for complete JPEG frame
  255. start_idx = buffer.find(jpeg_start)
  256. if start_idx == -1:
  257. continue
  258. end_idx = buffer.find(jpeg_end, start_idx + 2)
  259. if end_idx != -1:
  260. # Found complete frame
  261. frame = buffer[start_idx : end_idx + 2]
  262. return frame
  263. # Keep searching, but limit buffer size
  264. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  265. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  266. return None
  267. except TimeoutError:
  268. logger.warning("MJPEG frame capture timed out after %ss", timeout)
  269. return None
  270. except (aiohttp.ClientError, OSError) as e:
  271. logger.error("MJPEG frame capture failed: %s", e)
  272. return None
  273. return None
  274. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  275. """Capture frame from RTSP using ffmpeg."""
  276. ffmpeg = get_ffmpeg_path()
  277. if not ffmpeg:
  278. logger.error("ffmpeg not found - required for RTSP capture")
  279. return None
  280. # Use ffmpeg to grab a single frame from RTSP stream
  281. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  282. cmd = [
  283. ffmpeg,
  284. "-rtsp_transport",
  285. "tcp",
  286. "-i",
  287. url,
  288. "-frames:v",
  289. "1",
  290. "-f",
  291. "image2pipe",
  292. "-vcodec",
  293. "mjpeg",
  294. "-q:v",
  295. "2",
  296. "-",
  297. ]
  298. try:
  299. logger.debug(f"Running ffmpeg command: {' '.join(cmd[:6])}...")
  300. process = await asyncio.create_subprocess_exec(
  301. *cmd,
  302. stdout=asyncio.subprocess.PIPE,
  303. stderr=asyncio.subprocess.PIPE,
  304. )
  305. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  306. logger.debug(
  307. f"ffmpeg returned: code={process.returncode}, stdout={len(stdout)} bytes, stderr={len(stderr)} bytes"
  308. )
  309. if process.returncode != 0:
  310. logger.error("ffmpeg RTSP capture failed: %s", stderr.decode()[:200])
  311. return None
  312. if not stdout or len(stdout) < 100:
  313. logger.error("ffmpeg returned empty or too small frame")
  314. return None
  315. return stdout
  316. except TimeoutError:
  317. logger.warning("RTSP frame capture timed out after %ss", timeout)
  318. if process:
  319. process.kill()
  320. return None
  321. except OSError as e:
  322. logger.error("RTSP frame capture failed: %s", e)
  323. return None
  324. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  325. """Fetch snapshot from HTTP URL.
  326. Note: This function intentionally makes requests to user-configured URLs.
  327. External camera support requires connecting to user-specified camera endpoints.
  328. URL is sanitized and dangerous destinations are blocked.
  329. """
  330. # Sanitize URL - returns reconstructed URL from validated components
  331. safe_url = _sanitize_camera_url(url, ("http", "https"))
  332. if not safe_url:
  333. logger.error("Invalid snapshot URL format: %s...", url[:50])
  334. return None
  335. try:
  336. async with (
  337. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  338. session.get(safe_url) as response,
  339. ):
  340. if response.status != 200:
  341. logger.error("Snapshot URL returned status %s", response.status)
  342. return None
  343. data = await response.read()
  344. # Validate it looks like JPEG
  345. if not data.startswith(b"\xff\xd8"):
  346. logger.warning("Snapshot does not appear to be JPEG")
  347. # Still return it - might be valid with different header
  348. return data
  349. except TimeoutError:
  350. logger.warning("Snapshot capture timed out after %ss", timeout)
  351. return None
  352. except (aiohttp.ClientError, OSError) as e:
  353. logger.error("Snapshot capture failed: %s", e)
  354. return None
  355. async def test_connection(url: str, camera_type: str) -> dict:
  356. """Test camera connection.
  357. Returns:
  358. Dict with {success: bool, error?: str, resolution?: str}
  359. """
  360. logger.info("Testing camera connection: type=%s, url=%s...", camera_type, url[:50])
  361. try:
  362. frame = await capture_frame(url, camera_type, timeout=10)
  363. logger.info("Capture result: %s bytes", len(frame) if frame else 0)
  364. if frame:
  365. # Try to get resolution from JPEG header
  366. resolution = None
  367. try:
  368. # Simple JPEG dimension extraction
  369. # SOF0 marker is FF C0, followed by length, precision, height, width
  370. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  371. for marker in sof_markers:
  372. idx = frame.find(marker)
  373. if idx != -1 and idx + 9 <= len(frame):
  374. height = (frame[idx + 5] << 8) | frame[idx + 6]
  375. width = (frame[idx + 7] << 8) | frame[idx + 8]
  376. resolution = f"{width}x{height}"
  377. break
  378. except (IndexError, ValueError):
  379. pass # Resolution detection is optional; fall back to default
  380. return {"success": True, "resolution": resolution}
  381. else:
  382. return {"success": False, "error": "Failed to capture frame from camera"}
  383. except Exception as e:
  384. # Sanitize error message - don't expose internal details
  385. error_type = type(e).__name__
  386. logger.error("Camera connection test failed: %s", e)
  387. return {"success": False, "error": f"Connection failed: {error_type}"}
  388. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  389. """Generator yielding MJPEG frames for streaming.
  390. Args:
  391. url: Camera URL or USB device path
  392. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  393. fps: Target frames per second
  394. Yields:
  395. MJPEG frame data with HTTP multipart boundaries
  396. """
  397. frame_interval = 1.0 / max(fps, 1)
  398. last_frame_time = 0.0
  399. if camera_type == "mjpeg":
  400. # Proxy MJPEG stream directly
  401. async for frame in _stream_mjpeg(url):
  402. current_time = asyncio.get_event_loop().time()
  403. if current_time - last_frame_time >= frame_interval:
  404. last_frame_time = current_time
  405. yield _format_mjpeg_frame(frame)
  406. elif camera_type == "rtsp":
  407. # Use ffmpeg to convert RTSP to MJPEG
  408. async for frame in _stream_rtsp(url, fps):
  409. yield _format_mjpeg_frame(frame)
  410. elif camera_type == "usb":
  411. # Use ffmpeg to stream from USB camera
  412. async for frame in _stream_usb(url, fps):
  413. yield _format_mjpeg_frame(frame)
  414. elif camera_type == "snapshot":
  415. # Poll snapshot URL at interval
  416. while True:
  417. try:
  418. frame = await _capture_snapshot(url, timeout=10)
  419. if frame:
  420. yield _format_mjpeg_frame(frame)
  421. await asyncio.sleep(frame_interval)
  422. except asyncio.CancelledError:
  423. break
  424. except (aiohttp.ClientError, OSError) as e:
  425. logger.warning("Snapshot poll failed: %s", e)
  426. await asyncio.sleep(frame_interval)
  427. def _format_mjpeg_frame(frame: bytes) -> bytes:
  428. """Format frame for MJPEG HTTP response."""
  429. return (
  430. b"--frame\r\n"
  431. b"Content-Type: image/jpeg\r\n"
  432. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  433. b"\r\n" + frame + b"\r\n"
  434. )
  435. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  436. """Stream frames from MJPEG URL.
  437. Note: This function intentionally makes requests to user-configured URLs.
  438. External camera support requires connecting to user-specified camera endpoints.
  439. URL is sanitized and dangerous destinations are blocked.
  440. """
  441. # Sanitize URL - returns reconstructed URL from validated components
  442. safe_url = _sanitize_camera_url(url, ("http", "https"))
  443. if not safe_url:
  444. logger.error("Invalid MJPEG stream URL: %s...", url[:50])
  445. return
  446. try:
  447. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  448. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(safe_url) as response:
  449. if response.status != 200:
  450. logger.error("MJPEG stream returned status %s", response.status)
  451. return
  452. buffer = b""
  453. jpeg_start = b"\xff\xd8"
  454. jpeg_end = b"\xff\xd9"
  455. async for chunk in response.content.iter_chunked(8192):
  456. buffer += chunk
  457. # Extract complete frames from buffer
  458. while True:
  459. start_idx = buffer.find(jpeg_start)
  460. if start_idx == -1:
  461. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  462. break
  463. if start_idx > 0:
  464. buffer = buffer[start_idx:]
  465. end_idx = buffer.find(jpeg_end, 2)
  466. if end_idx == -1:
  467. break
  468. frame = buffer[: end_idx + 2]
  469. buffer = buffer[end_idx + 2 :]
  470. yield frame
  471. except asyncio.CancelledError:
  472. logger.info("MJPEG stream cancelled")
  473. except (aiohttp.ClientError, OSError) as e:
  474. logger.error("MJPEG stream error: %s", e)
  475. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  476. """Stream frames from RTSP URL via ffmpeg."""
  477. ffmpeg = get_ffmpeg_path()
  478. if not ffmpeg:
  479. logger.error("ffmpeg not found - required for RTSP streaming")
  480. return
  481. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  482. cmd = [
  483. ffmpeg,
  484. "-rtsp_transport",
  485. "tcp",
  486. "-rtsp_flags",
  487. "prefer_tcp",
  488. "-timeout",
  489. "30000000",
  490. "-buffer_size",
  491. "1024000",
  492. "-max_delay",
  493. "500000",
  494. "-i",
  495. url,
  496. "-f",
  497. "mjpeg",
  498. "-q:v",
  499. "5",
  500. "-r",
  501. str(fps),
  502. "-an",
  503. "-",
  504. ]
  505. process = None
  506. try:
  507. process = await asyncio.create_subprocess_exec(
  508. *cmd,
  509. stdout=asyncio.subprocess.PIPE,
  510. stderr=asyncio.subprocess.PIPE,
  511. )
  512. # Give ffmpeg a moment to start and check for immediate failures
  513. await asyncio.sleep(0.5)
  514. if process.returncode is not None:
  515. stderr = await process.stderr.read()
  516. logger.error("ffmpeg RTSP stream failed immediately: %s", stderr.decode()[:300])
  517. return
  518. buffer = b""
  519. jpeg_start = b"\xff\xd8"
  520. jpeg_end = b"\xff\xd9"
  521. while True:
  522. try:
  523. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  524. if not chunk:
  525. break
  526. buffer += chunk
  527. # Extract complete frames
  528. while True:
  529. start_idx = buffer.find(jpeg_start)
  530. if start_idx == -1:
  531. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  532. break
  533. if start_idx > 0:
  534. buffer = buffer[start_idx:]
  535. end_idx = buffer.find(jpeg_end, 2)
  536. if end_idx == -1:
  537. break
  538. frame = buffer[: end_idx + 2]
  539. buffer = buffer[end_idx + 2 :]
  540. yield frame
  541. except TimeoutError:
  542. logger.warning("RTSP stream read timeout")
  543. break
  544. except asyncio.CancelledError:
  545. logger.info("RTSP stream cancelled")
  546. except OSError as e:
  547. logger.error("RTSP stream error: %s", e)
  548. finally:
  549. if process and process.returncode is None:
  550. process.terminate()
  551. try:
  552. await asyncio.wait_for(process.wait(), timeout=2.0)
  553. except TimeoutError:
  554. process.kill()
  555. await process.wait()
  556. async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
  557. """Stream frames from USB camera via ffmpeg."""
  558. ffmpeg = get_ffmpeg_path()
  559. if not ffmpeg:
  560. logger.error("ffmpeg not found - required for USB camera streaming")
  561. return
  562. # Validate device path
  563. if not device.startswith("/dev/video"):
  564. logger.error("Invalid USB device path: %s", device)
  565. return
  566. if not Path(device).exists():
  567. logger.error("USB device does not exist: %s", device)
  568. return
  569. # ffmpeg command to stream from USB camera (v4l2)
  570. cmd = [
  571. ffmpeg,
  572. "-f",
  573. "v4l2",
  574. "-framerate",
  575. str(fps),
  576. "-i",
  577. device,
  578. "-f",
  579. "mjpeg",
  580. "-q:v",
  581. "5",
  582. "-r",
  583. str(fps),
  584. "-",
  585. ]
  586. process = None
  587. try:
  588. logger.info("Starting USB camera stream from %s at %s fps", device, fps)
  589. process = await asyncio.create_subprocess_exec(
  590. *cmd,
  591. stdout=asyncio.subprocess.PIPE,
  592. stderr=asyncio.subprocess.PIPE,
  593. )
  594. # Give ffmpeg a moment to start and check for immediate failures
  595. await asyncio.sleep(0.5)
  596. if process.returncode is not None:
  597. stderr = await process.stderr.read()
  598. logger.error("ffmpeg USB stream failed immediately: %s", stderr.decode()[:300])
  599. return
  600. buffer = b""
  601. jpeg_start = b"\xff\xd8"
  602. jpeg_end = b"\xff\xd9"
  603. while True:
  604. try:
  605. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  606. if not chunk:
  607. break
  608. buffer += chunk
  609. # Extract complete frames
  610. while True:
  611. start_idx = buffer.find(jpeg_start)
  612. if start_idx == -1:
  613. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  614. break
  615. if start_idx > 0:
  616. buffer = buffer[start_idx:]
  617. end_idx = buffer.find(jpeg_end, 2)
  618. if end_idx == -1:
  619. break
  620. frame = buffer[: end_idx + 2]
  621. buffer = buffer[end_idx + 2 :]
  622. yield frame
  623. except TimeoutError:
  624. logger.warning("USB stream read timeout")
  625. break
  626. except asyncio.CancelledError:
  627. logger.info("USB stream cancelled")
  628. except OSError as e:
  629. logger.error("USB stream error: %s", e)
  630. finally:
  631. if process and process.returncode is None:
  632. process.terminate()
  633. try:
  634. await asyncio.wait_for(process.wait(), timeout=2.0)
  635. except TimeoutError:
  636. process.kill()
  637. await process.wait()