external_camera.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. """External camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
  3. Security Note: This service intentionally makes requests to user-configured camera URLs.
  4. This is necessary functionality for external camera integration. URLs are validated
  5. to ensure they are well-formed before use.
  6. """
  7. import asyncio
  8. import logging
  9. import re
  10. import shutil
  11. from collections.abc import AsyncGenerator
  12. from pathlib import Path
  13. from urllib.parse import urlparse
  14. import aiohttp
  15. logger = logging.getLogger(__name__)
  16. def _validate_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> bool:
  17. """Validate camera URL format.
  18. This validates that the URL is well-formed and uses an allowed scheme.
  19. Note: This intentionally allows user-provided URLs as that is the
  20. purpose of external camera configuration.
  21. Args:
  22. url: URL to validate
  23. allowed_schemes: Tuple of allowed URL schemes
  24. Returns:
  25. True if URL is valid, False otherwise
  26. """
  27. try:
  28. parsed = urlparse(url)
  29. if not parsed.scheme or not parsed.netloc:
  30. return False
  31. return parsed.scheme.lower() in allowed_schemes
  32. except Exception:
  33. return False
  34. def list_usb_cameras() -> list[dict]:
  35. """List available USB cameras (V4L2 devices on Linux).
  36. Returns:
  37. List of dicts with {device: str, name: str, capabilities: list}
  38. """
  39. cameras = []
  40. video_devices = sorted(Path("/dev").glob("video*"))
  41. for device in video_devices:
  42. device_path = str(device)
  43. info = {"device": device_path, "name": device.name, "capabilities": []}
  44. # Try to get device info via v4l2-ctl
  45. v4l2_ctl = shutil.which("v4l2-ctl")
  46. if v4l2_ctl:
  47. import subprocess
  48. try:
  49. result = subprocess.run(
  50. [v4l2_ctl, "-d", device_path, "--info"],
  51. capture_output=True,
  52. text=True,
  53. timeout=5,
  54. )
  55. if result.returncode == 0:
  56. # Parse device name from output
  57. for line in result.stdout.splitlines():
  58. if "Card type" in line:
  59. info["name"] = line.split(":", 1)[1].strip()
  60. elif "Driver name" in line:
  61. info["driver"] = line.split(":", 1)[1].strip()
  62. # Check if device supports video capture
  63. result = subprocess.run(
  64. [v4l2_ctl, "-d", device_path, "--list-formats"],
  65. capture_output=True,
  66. text=True,
  67. timeout=5,
  68. )
  69. if result.returncode == 0 and result.stdout.strip():
  70. info["capabilities"].append("capture")
  71. # Parse available formats
  72. formats = re.findall(r"'(\w+)'", result.stdout)
  73. info["formats"] = list(set(formats))
  74. except (subprocess.TimeoutExpired, Exception) as e:
  75. logger.debug(f"v4l2-ctl failed for {device_path}: {e}")
  76. # Only include devices that look like video capture devices
  77. # Skip metadata devices (typically odd numbered like video1, video3)
  78. try:
  79. device_num = int(device.name.replace("video", ""))
  80. # Even numbered devices are usually capture, odd are metadata
  81. # But also check if we got capabilities
  82. if info.get("capabilities") or device_num % 2 == 0:
  83. cameras.append(info)
  84. except ValueError:
  85. cameras.append(info)
  86. return cameras
  87. def get_ffmpeg_path() -> str | None:
  88. """Get the path to ffmpeg executable."""
  89. # Try shutil.which first
  90. path = shutil.which("ffmpeg")
  91. if path:
  92. return path
  93. # Check common locations (systemd services may have limited PATH)
  94. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  95. if Path(common_path).exists():
  96. return common_path
  97. return None
  98. async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
  99. """Capture single frame from external camera.
  100. Args:
  101. url: Camera URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path)
  102. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  103. timeout: Connection timeout in seconds
  104. Returns:
  105. JPEG bytes or None on failure
  106. """
  107. logger.debug(f"capture_frame called: type={camera_type}, url={url[:50] if url else 'None'}...")
  108. if camera_type == "mjpeg":
  109. return await _capture_mjpeg_frame(url, timeout)
  110. elif camera_type == "rtsp":
  111. return await _capture_rtsp_frame(url, timeout)
  112. elif camera_type == "snapshot":
  113. return await _capture_snapshot(url, timeout)
  114. elif camera_type == "usb":
  115. return await _capture_usb_frame(url, timeout)
  116. else:
  117. logger.warning(f"Unknown camera type: {camera_type}")
  118. return None
  119. async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
  120. """Capture frame from USB camera using ffmpeg."""
  121. ffmpeg = get_ffmpeg_path()
  122. if not ffmpeg:
  123. logger.error("ffmpeg not found - required for USB camera capture")
  124. return None
  125. # Validate device path - must be /dev/videoN format where N is 0-99
  126. # This prevents path traversal by using a strict allowlist approach
  127. import re as regex_module
  128. device_match = regex_module.match(r"^/dev/video(\d{1,2})$", device)
  129. if not device_match:
  130. logger.error(f"Invalid USB device path format: {device}")
  131. return None
  132. # Convert to integer to break taint chain - integers cannot contain path traversal
  133. # lgtm[py/path-injection] - device_num is validated integer 0-99
  134. device_num = int(device_match.group(1)) # Safe: regex guarantees 1-2 digits
  135. if device_num > 99:
  136. logger.error(f"USB device number out of range: {device_num}")
  137. return None
  138. # Construct safe path from validated integer (completely untainted)
  139. safe_device_path = Path(f"/dev/video{device_num}") # lgtm[py/path-injection]
  140. if not safe_device_path.exists():
  141. logger.error(f"USB device does not exist: {safe_device_path}")
  142. return None
  143. # Use the safe path for ffmpeg - this is a hardcoded /dev/videoN path
  144. device = str(safe_device_path) # lgtm[py/path-injection]
  145. # Use ffmpeg to grab a single frame from USB camera
  146. cmd = [
  147. ffmpeg,
  148. "-f",
  149. "v4l2",
  150. "-i",
  151. device,
  152. "-frames:v",
  153. "1",
  154. "-f",
  155. "image2pipe",
  156. "-vcodec",
  157. "mjpeg",
  158. "-q:v",
  159. "2",
  160. "-",
  161. ]
  162. try:
  163. logger.debug(f"Running USB capture: {' '.join(cmd)}")
  164. process = await asyncio.create_subprocess_exec(
  165. *cmd,
  166. stdout=asyncio.subprocess.PIPE,
  167. stderr=asyncio.subprocess.PIPE,
  168. )
  169. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  170. if process.returncode != 0:
  171. logger.error(f"ffmpeg USB capture failed: {stderr.decode()[:200]}")
  172. return None
  173. if not stdout or len(stdout) < 100:
  174. logger.error("ffmpeg returned empty or too small frame from USB camera")
  175. return None
  176. return stdout
  177. except TimeoutError:
  178. logger.warning(f"USB frame capture timed out after {timeout}s")
  179. if process:
  180. process.kill()
  181. return None
  182. except Exception as e:
  183. logger.error(f"USB frame capture failed: {e}")
  184. return None
  185. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  186. """Extract single frame from MJPEG stream.
  187. Note: This function intentionally makes requests to user-configured URLs.
  188. External camera support requires connecting to user-specified camera endpoints.
  189. URL format is validated but the destination is intentionally user-controlled.
  190. """
  191. # Validate URL format (user-configured camera URL - intentional external request)
  192. if not _validate_camera_url(url, ("http", "https")):
  193. logger.error(f"Invalid MJPEG URL format: {url[:50]}...")
  194. return None
  195. try:
  196. # Intentional SSRF: External camera URLs are user-configured by design
  197. async with (
  198. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  199. session.get(url) as response, # nosec lgtm[py/ssrf]
  200. ):
  201. if response.status != 200:
  202. logger.error(f"MJPEG stream returned status {response.status}")
  203. return None
  204. # Read chunks until we find a complete JPEG frame
  205. buffer = b""
  206. jpeg_start = b"\xff\xd8"
  207. jpeg_end = b"\xff\xd9"
  208. async for chunk in response.content.iter_chunked(8192):
  209. buffer += chunk
  210. # Look for complete JPEG frame
  211. start_idx = buffer.find(jpeg_start)
  212. if start_idx == -1:
  213. continue
  214. end_idx = buffer.find(jpeg_end, start_idx + 2)
  215. if end_idx != -1:
  216. # Found complete frame
  217. frame = buffer[start_idx : end_idx + 2]
  218. return frame
  219. # Keep searching, but limit buffer size
  220. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  221. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  222. return None
  223. except TimeoutError:
  224. logger.warning(f"MJPEG frame capture timed out after {timeout}s")
  225. return None
  226. except Exception as e:
  227. logger.error(f"MJPEG frame capture failed: {e}")
  228. return None
  229. return None
  230. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  231. """Capture frame from RTSP using ffmpeg."""
  232. ffmpeg = get_ffmpeg_path()
  233. if not ffmpeg:
  234. logger.error("ffmpeg not found - required for RTSP capture")
  235. return None
  236. # Use ffmpeg to grab a single frame from RTSP stream
  237. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  238. cmd = [
  239. ffmpeg,
  240. "-rtsp_transport",
  241. "tcp",
  242. "-i",
  243. url,
  244. "-frames:v",
  245. "1",
  246. "-f",
  247. "image2pipe",
  248. "-vcodec",
  249. "mjpeg",
  250. "-q:v",
  251. "2",
  252. "-",
  253. ]
  254. try:
  255. print(f"[EXT-CAM] Running ffmpeg command: {' '.join(cmd[:6])}...")
  256. process = await asyncio.create_subprocess_exec(
  257. *cmd,
  258. stdout=asyncio.subprocess.PIPE,
  259. stderr=asyncio.subprocess.PIPE,
  260. )
  261. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  262. print(
  263. f"[EXT-CAM] ffmpeg returned: code={process.returncode}, stdout={len(stdout)} bytes, stderr={len(stderr)} bytes"
  264. )
  265. if process.returncode != 0:
  266. logger.error(f"ffmpeg RTSP capture failed: {stderr.decode()[:200]}")
  267. print(f"[EXT-CAM] ffmpeg error: {stderr.decode()[:300]}")
  268. return None
  269. if not stdout or len(stdout) < 100:
  270. logger.error("ffmpeg returned empty or too small frame")
  271. return None
  272. return stdout
  273. except TimeoutError:
  274. logger.warning(f"RTSP frame capture timed out after {timeout}s")
  275. if process:
  276. process.kill()
  277. return None
  278. except Exception as e:
  279. logger.error(f"RTSP frame capture failed: {e}")
  280. return None
  281. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  282. """Fetch snapshot from HTTP URL.
  283. Note: This function intentionally makes requests to user-configured URLs.
  284. External camera support requires connecting to user-specified camera endpoints.
  285. URL format is validated but the destination is intentionally user-controlled.
  286. """
  287. # Validate URL format (user-configured camera URL - intentional external request)
  288. if not _validate_camera_url(url, ("http", "https")):
  289. logger.error(f"Invalid snapshot URL format: {url[:50]}...")
  290. return None
  291. try:
  292. # Intentional SSRF: External camera URLs are user-configured by design
  293. async with (
  294. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  295. session.get(url) as response, # nosec lgtm[py/ssrf]
  296. ):
  297. if response.status != 200:
  298. logger.error(f"Snapshot URL returned status {response.status}")
  299. return None
  300. data = await response.read()
  301. # Validate it looks like JPEG
  302. if not data.startswith(b"\xff\xd8"):
  303. logger.warning("Snapshot does not appear to be JPEG")
  304. # Still return it - might be valid with different header
  305. return data
  306. except TimeoutError:
  307. logger.warning(f"Snapshot capture timed out after {timeout}s")
  308. return None
  309. except Exception as e:
  310. logger.error(f"Snapshot capture failed: {e}")
  311. return None
  312. async def test_connection(url: str, camera_type: str) -> dict:
  313. """Test camera connection.
  314. Returns:
  315. Dict with {success: bool, error?: str, resolution?: str}
  316. """
  317. print(f"[EXT-CAM] Testing camera connection: type={camera_type}, url={url[:50]}...")
  318. logger.info(f"Testing camera connection: type={camera_type}, url={url[:50]}...")
  319. try:
  320. frame = await capture_frame(url, camera_type, timeout=10)
  321. print(f"[EXT-CAM] Capture result: {len(frame) if frame else 0} bytes")
  322. logger.info(f"Capture result: {len(frame) if frame else 0} bytes")
  323. if frame:
  324. # Try to get resolution from JPEG header
  325. resolution = None
  326. try:
  327. # Simple JPEG dimension extraction
  328. # SOF0 marker is FF C0, followed by length, precision, height, width
  329. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  330. for marker in sof_markers:
  331. idx = frame.find(marker)
  332. if idx != -1 and idx + 9 <= len(frame):
  333. height = (frame[idx + 5] << 8) | frame[idx + 6]
  334. width = (frame[idx + 7] << 8) | frame[idx + 8]
  335. resolution = f"{width}x{height}"
  336. break
  337. except Exception:
  338. pass
  339. return {"success": True, "resolution": resolution}
  340. else:
  341. return {"success": False, "error": "Failed to capture frame from camera"}
  342. except Exception as e:
  343. # Sanitize error message - don't expose internal details
  344. error_type = type(e).__name__
  345. logger.error(f"Camera connection test failed: {e}")
  346. return {"success": False, "error": f"Connection failed: {error_type}"}
  347. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  348. """Generator yielding MJPEG frames for streaming.
  349. Args:
  350. url: Camera URL or USB device path
  351. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  352. fps: Target frames per second
  353. Yields:
  354. MJPEG frame data with HTTP multipart boundaries
  355. """
  356. frame_interval = 1.0 / max(fps, 1)
  357. last_frame_time = 0.0
  358. if camera_type == "mjpeg":
  359. # Proxy MJPEG stream directly
  360. async for frame in _stream_mjpeg(url):
  361. current_time = asyncio.get_event_loop().time()
  362. if current_time - last_frame_time >= frame_interval:
  363. last_frame_time = current_time
  364. yield _format_mjpeg_frame(frame)
  365. elif camera_type == "rtsp":
  366. # Use ffmpeg to convert RTSP to MJPEG
  367. async for frame in _stream_rtsp(url, fps):
  368. yield _format_mjpeg_frame(frame)
  369. elif camera_type == "usb":
  370. # Use ffmpeg to stream from USB camera
  371. async for frame in _stream_usb(url, fps):
  372. yield _format_mjpeg_frame(frame)
  373. elif camera_type == "snapshot":
  374. # Poll snapshot URL at interval
  375. while True:
  376. try:
  377. frame = await _capture_snapshot(url, timeout=10)
  378. if frame:
  379. yield _format_mjpeg_frame(frame)
  380. await asyncio.sleep(frame_interval)
  381. except asyncio.CancelledError:
  382. break
  383. except Exception as e:
  384. logger.warning(f"Snapshot poll failed: {e}")
  385. await asyncio.sleep(frame_interval)
  386. def _format_mjpeg_frame(frame: bytes) -> bytes:
  387. """Format frame for MJPEG HTTP response."""
  388. return (
  389. b"--frame\r\n"
  390. b"Content-Type: image/jpeg\r\n"
  391. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  392. b"\r\n" + frame + b"\r\n"
  393. )
  394. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  395. """Stream frames from MJPEG URL.
  396. Note: This function intentionally makes requests to user-configured URLs.
  397. External camera support requires connecting to user-specified camera endpoints.
  398. """
  399. try:
  400. # Intentional SSRF: External camera URLs are user-configured by design
  401. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  402. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(url) as response: # nosec lgtm[py/ssrf]
  403. if response.status != 200:
  404. logger.error(f"MJPEG stream returned status {response.status}")
  405. return
  406. buffer = b""
  407. jpeg_start = b"\xff\xd8"
  408. jpeg_end = b"\xff\xd9"
  409. async for chunk in response.content.iter_chunked(8192):
  410. buffer += chunk
  411. # Extract complete frames from buffer
  412. while True:
  413. start_idx = buffer.find(jpeg_start)
  414. if start_idx == -1:
  415. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  416. break
  417. if start_idx > 0:
  418. buffer = buffer[start_idx:]
  419. end_idx = buffer.find(jpeg_end, 2)
  420. if end_idx == -1:
  421. break
  422. frame = buffer[: end_idx + 2]
  423. buffer = buffer[end_idx + 2 :]
  424. yield frame
  425. except asyncio.CancelledError:
  426. logger.info("MJPEG stream cancelled")
  427. except Exception as e:
  428. logger.error(f"MJPEG stream error: {e}")
  429. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  430. """Stream frames from RTSP URL via ffmpeg."""
  431. ffmpeg = get_ffmpeg_path()
  432. if not ffmpeg:
  433. logger.error("ffmpeg not found - required for RTSP streaming")
  434. return
  435. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  436. cmd = [
  437. ffmpeg,
  438. "-rtsp_transport",
  439. "tcp",
  440. "-rtsp_flags",
  441. "prefer_tcp",
  442. "-timeout",
  443. "30000000",
  444. "-buffer_size",
  445. "1024000",
  446. "-max_delay",
  447. "500000",
  448. "-i",
  449. url,
  450. "-f",
  451. "mjpeg",
  452. "-q:v",
  453. "5",
  454. "-r",
  455. str(fps),
  456. "-an",
  457. "-",
  458. ]
  459. process = None
  460. try:
  461. process = await asyncio.create_subprocess_exec(
  462. *cmd,
  463. stdout=asyncio.subprocess.PIPE,
  464. stderr=asyncio.subprocess.PIPE,
  465. )
  466. # Give ffmpeg a moment to start and check for immediate failures
  467. await asyncio.sleep(0.5)
  468. if process.returncode is not None:
  469. stderr = await process.stderr.read()
  470. logger.error(f"ffmpeg RTSP stream failed immediately: {stderr.decode()[:300]}")
  471. return
  472. buffer = b""
  473. jpeg_start = b"\xff\xd8"
  474. jpeg_end = b"\xff\xd9"
  475. while True:
  476. try:
  477. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  478. if not chunk:
  479. break
  480. buffer += chunk
  481. # Extract complete frames
  482. while True:
  483. start_idx = buffer.find(jpeg_start)
  484. if start_idx == -1:
  485. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  486. break
  487. if start_idx > 0:
  488. buffer = buffer[start_idx:]
  489. end_idx = buffer.find(jpeg_end, 2)
  490. if end_idx == -1:
  491. break
  492. frame = buffer[: end_idx + 2]
  493. buffer = buffer[end_idx + 2 :]
  494. yield frame
  495. except TimeoutError:
  496. logger.warning("RTSP stream read timeout")
  497. break
  498. except asyncio.CancelledError:
  499. logger.info("RTSP stream cancelled")
  500. except Exception as e:
  501. logger.error(f"RTSP stream error: {e}")
  502. finally:
  503. if process and process.returncode is None:
  504. process.terminate()
  505. try:
  506. await asyncio.wait_for(process.wait(), timeout=2.0)
  507. except TimeoutError:
  508. process.kill()
  509. await process.wait()
  510. async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
  511. """Stream frames from USB camera via ffmpeg."""
  512. ffmpeg = get_ffmpeg_path()
  513. if not ffmpeg:
  514. logger.error("ffmpeg not found - required for USB camera streaming")
  515. return
  516. # Validate device path
  517. if not device.startswith("/dev/video"):
  518. logger.error(f"Invalid USB device path: {device}")
  519. return
  520. if not Path(device).exists():
  521. logger.error(f"USB device does not exist: {device}")
  522. return
  523. # ffmpeg command to stream from USB camera (v4l2)
  524. cmd = [
  525. ffmpeg,
  526. "-f",
  527. "v4l2",
  528. "-framerate",
  529. str(fps),
  530. "-i",
  531. device,
  532. "-f",
  533. "mjpeg",
  534. "-q:v",
  535. "5",
  536. "-r",
  537. str(fps),
  538. "-",
  539. ]
  540. process = None
  541. try:
  542. logger.info(f"Starting USB camera stream from {device} at {fps} fps")
  543. process = await asyncio.create_subprocess_exec(
  544. *cmd,
  545. stdout=asyncio.subprocess.PIPE,
  546. stderr=asyncio.subprocess.PIPE,
  547. )
  548. # Give ffmpeg a moment to start and check for immediate failures
  549. await asyncio.sleep(0.5)
  550. if process.returncode is not None:
  551. stderr = await process.stderr.read()
  552. logger.error(f"ffmpeg USB stream failed immediately: {stderr.decode()[:300]}")
  553. return
  554. buffer = b""
  555. jpeg_start = b"\xff\xd8"
  556. jpeg_end = b"\xff\xd9"
  557. while True:
  558. try:
  559. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  560. if not chunk:
  561. break
  562. buffer += chunk
  563. # Extract complete frames
  564. while True:
  565. start_idx = buffer.find(jpeg_start)
  566. if start_idx == -1:
  567. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  568. break
  569. if start_idx > 0:
  570. buffer = buffer[start_idx:]
  571. end_idx = buffer.find(jpeg_end, 2)
  572. if end_idx == -1:
  573. break
  574. frame = buffer[: end_idx + 2]
  575. buffer = buffer[end_idx + 2 :]
  576. yield frame
  577. except TimeoutError:
  578. logger.warning("USB stream read timeout")
  579. break
  580. except asyncio.CancelledError:
  581. logger.info("USB stream cancelled")
  582. except Exception as e:
  583. logger.error(f"USB stream error: {e}")
  584. finally:
  585. if process and process.returncode is None:
  586. process.terminate()
  587. try:
  588. await asyncio.wait_for(process.wait(), timeout=2.0)
  589. except TimeoutError:
  590. process.kill()
  591. await process.wait()