external_camera.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695
  1. """External camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
  3. Security Note: This service intentionally makes requests to user-configured camera URLs.
  4. This is necessary functionality for external camera integration. URLs are validated
  5. to ensure they are well-formed before use.
  6. """
  7. import asyncio
  8. import logging
  9. import re
  10. import shutil
  11. from collections.abc import AsyncGenerator
  12. from pathlib import Path
  13. from urllib.parse import urlparse
  14. import aiohttp
  15. logger = logging.getLogger(__name__)
  16. def _validate_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> bool:
  17. """Validate camera URL format.
  18. This validates that the URL is well-formed and uses an allowed scheme.
  19. Note: This intentionally allows user-provided URLs as that is the
  20. purpose of external camera configuration.
  21. Args:
  22. url: URL to validate
  23. allowed_schemes: Tuple of allowed URL schemes
  24. Returns:
  25. True if URL is valid, False otherwise
  26. """
  27. try:
  28. parsed = urlparse(url)
  29. if not parsed.scheme or not parsed.netloc:
  30. return False
  31. return parsed.scheme.lower() in allowed_schemes
  32. except Exception:
  33. return False
  34. def list_usb_cameras() -> list[dict]:
  35. """List available USB cameras (V4L2 devices on Linux).
  36. Returns:
  37. List of dicts with {device: str, name: str, capabilities: list}
  38. """
  39. cameras = []
  40. video_devices = sorted(Path("/dev").glob("video*"))
  41. for device in video_devices:
  42. device_path = str(device)
  43. info = {"device": device_path, "name": device.name, "capabilities": []}
  44. # Try to get device info via v4l2-ctl
  45. v4l2_ctl = shutil.which("v4l2-ctl")
  46. if v4l2_ctl:
  47. import subprocess
  48. try:
  49. result = subprocess.run(
  50. [v4l2_ctl, "-d", device_path, "--info"],
  51. capture_output=True,
  52. text=True,
  53. timeout=5,
  54. )
  55. if result.returncode == 0:
  56. # Parse device name from output
  57. for line in result.stdout.splitlines():
  58. if "Card type" in line:
  59. info["name"] = line.split(":", 1)[1].strip()
  60. elif "Driver name" in line:
  61. info["driver"] = line.split(":", 1)[1].strip()
  62. # Check if device supports video capture
  63. result = subprocess.run(
  64. [v4l2_ctl, "-d", device_path, "--list-formats"],
  65. capture_output=True,
  66. text=True,
  67. timeout=5,
  68. )
  69. if result.returncode == 0 and result.stdout.strip():
  70. info["capabilities"].append("capture")
  71. # Parse available formats
  72. formats = re.findall(r"'(\w+)'", result.stdout)
  73. info["formats"] = list(set(formats))
  74. except (subprocess.TimeoutExpired, Exception) as e:
  75. logger.debug(f"v4l2-ctl failed for {device_path}: {e}")
  76. # Only include devices that look like video capture devices
  77. # Skip metadata devices (typically odd numbered like video1, video3)
  78. try:
  79. device_num = int(device.name.replace("video", ""))
  80. # Even numbered devices are usually capture, odd are metadata
  81. # But also check if we got capabilities
  82. if info.get("capabilities") or device_num % 2 == 0:
  83. cameras.append(info)
  84. except ValueError:
  85. cameras.append(info)
  86. return cameras
  87. def get_ffmpeg_path() -> str | None:
  88. """Get the path to ffmpeg executable."""
  89. # Try shutil.which first
  90. path = shutil.which("ffmpeg")
  91. if path:
  92. return path
  93. # Check common locations (systemd services may have limited PATH)
  94. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  95. if Path(common_path).exists():
  96. return common_path
  97. return None
  98. async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
  99. """Capture single frame from external camera.
  100. Args:
  101. url: Camera URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path)
  102. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  103. timeout: Connection timeout in seconds
  104. Returns:
  105. JPEG bytes or None on failure
  106. """
  107. logger.debug(f"capture_frame called: type={camera_type}, url={url[:50] if url else 'None'}...")
  108. if camera_type == "mjpeg":
  109. return await _capture_mjpeg_frame(url, timeout)
  110. elif camera_type == "rtsp":
  111. return await _capture_rtsp_frame(url, timeout)
  112. elif camera_type == "snapshot":
  113. return await _capture_snapshot(url, timeout)
  114. elif camera_type == "usb":
  115. return await _capture_usb_frame(url, timeout)
  116. else:
  117. logger.warning(f"Unknown camera type: {camera_type}")
  118. return None
  119. async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
  120. """Capture frame from USB camera using ffmpeg."""
  121. ffmpeg = get_ffmpeg_path()
  122. if not ffmpeg:
  123. logger.error("ffmpeg not found - required for USB camera capture")
  124. return None
  125. # Validate device path - must be /dev/videoN format
  126. if not device.startswith("/dev/video"):
  127. logger.error(f"Invalid USB device path: {device}")
  128. return None
  129. # Additional path validation to prevent path traversal
  130. # Resolve to absolute path and verify it's still under /dev/
  131. try:
  132. resolved_path = Path(device).resolve()
  133. if not str(resolved_path).startswith("/dev/video"):
  134. logger.error(f"Invalid USB device path after resolution: {device}")
  135. return None
  136. except (OSError, ValueError):
  137. logger.error(f"Failed to resolve USB device path: {device}")
  138. return None
  139. if not resolved_path.exists(): # nosec B108 - path validated above
  140. logger.error(f"USB device does not exist: {device}")
  141. return None
  142. # Use ffmpeg to grab a single frame from USB camera
  143. cmd = [
  144. ffmpeg,
  145. "-f",
  146. "v4l2",
  147. "-i",
  148. device,
  149. "-frames:v",
  150. "1",
  151. "-f",
  152. "image2pipe",
  153. "-vcodec",
  154. "mjpeg",
  155. "-q:v",
  156. "2",
  157. "-",
  158. ]
  159. try:
  160. logger.debug(f"Running USB capture: {' '.join(cmd)}")
  161. process = await asyncio.create_subprocess_exec(
  162. *cmd,
  163. stdout=asyncio.subprocess.PIPE,
  164. stderr=asyncio.subprocess.PIPE,
  165. )
  166. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  167. if process.returncode != 0:
  168. logger.error(f"ffmpeg USB capture failed: {stderr.decode()[:200]}")
  169. return None
  170. if not stdout or len(stdout) < 100:
  171. logger.error("ffmpeg returned empty or too small frame from USB camera")
  172. return None
  173. return stdout
  174. except TimeoutError:
  175. logger.warning(f"USB frame capture timed out after {timeout}s")
  176. if process:
  177. process.kill()
  178. return None
  179. except Exception as e:
  180. logger.error(f"USB frame capture failed: {e}")
  181. return None
  182. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  183. """Extract single frame from MJPEG stream."""
  184. # Validate URL format (user-configured camera URL - intentional external request)
  185. if not _validate_camera_url(url, ("http", "https")):
  186. logger.error(f"Invalid MJPEG URL format: {url[:50]}...")
  187. return None
  188. try:
  189. async with (
  190. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  191. session.get(url) as response, # nosec B113 - URL validated above, user-configured camera
  192. ):
  193. if response.status != 200:
  194. logger.error(f"MJPEG stream returned status {response.status}")
  195. return None
  196. # Read chunks until we find a complete JPEG frame
  197. buffer = b""
  198. jpeg_start = b"\xff\xd8"
  199. jpeg_end = b"\xff\xd9"
  200. async for chunk in response.content.iter_chunked(8192):
  201. buffer += chunk
  202. # Look for complete JPEG frame
  203. start_idx = buffer.find(jpeg_start)
  204. if start_idx == -1:
  205. continue
  206. end_idx = buffer.find(jpeg_end, start_idx + 2)
  207. if end_idx != -1:
  208. # Found complete frame
  209. frame = buffer[start_idx : end_idx + 2]
  210. return frame
  211. # Keep searching, but limit buffer size
  212. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  213. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  214. return None
  215. except TimeoutError:
  216. logger.warning(f"MJPEG frame capture timed out after {timeout}s")
  217. return None
  218. except Exception as e:
  219. logger.error(f"MJPEG frame capture failed: {e}")
  220. return None
  221. return None
  222. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  223. """Capture frame from RTSP using ffmpeg."""
  224. ffmpeg = get_ffmpeg_path()
  225. if not ffmpeg:
  226. logger.error("ffmpeg not found - required for RTSP capture")
  227. return None
  228. # Use ffmpeg to grab a single frame from RTSP stream
  229. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  230. cmd = [
  231. ffmpeg,
  232. "-rtsp_transport",
  233. "tcp",
  234. "-i",
  235. url,
  236. "-frames:v",
  237. "1",
  238. "-f",
  239. "image2pipe",
  240. "-vcodec",
  241. "mjpeg",
  242. "-q:v",
  243. "2",
  244. "-",
  245. ]
  246. try:
  247. print(f"[EXT-CAM] Running ffmpeg command: {' '.join(cmd[:6])}...")
  248. process = await asyncio.create_subprocess_exec(
  249. *cmd,
  250. stdout=asyncio.subprocess.PIPE,
  251. stderr=asyncio.subprocess.PIPE,
  252. )
  253. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  254. print(
  255. f"[EXT-CAM] ffmpeg returned: code={process.returncode}, stdout={len(stdout)} bytes, stderr={len(stderr)} bytes"
  256. )
  257. if process.returncode != 0:
  258. logger.error(f"ffmpeg RTSP capture failed: {stderr.decode()[:200]}")
  259. print(f"[EXT-CAM] ffmpeg error: {stderr.decode()[:300]}")
  260. return None
  261. if not stdout or len(stdout) < 100:
  262. logger.error("ffmpeg returned empty or too small frame")
  263. return None
  264. return stdout
  265. except TimeoutError:
  266. logger.warning(f"RTSP frame capture timed out after {timeout}s")
  267. if process:
  268. process.kill()
  269. return None
  270. except Exception as e:
  271. logger.error(f"RTSP frame capture failed: {e}")
  272. return None
  273. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  274. """Fetch snapshot from HTTP URL."""
  275. # Validate URL format (user-configured camera URL - intentional external request)
  276. if not _validate_camera_url(url, ("http", "https")):
  277. logger.error(f"Invalid snapshot URL format: {url[:50]}...")
  278. return None
  279. try:
  280. async with (
  281. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  282. session.get(url) as response, # nosec B113 - URL validated above, user-configured camera
  283. ):
  284. if response.status != 200:
  285. logger.error(f"Snapshot URL returned status {response.status}")
  286. return None
  287. data = await response.read()
  288. # Validate it looks like JPEG
  289. if not data.startswith(b"\xff\xd8"):
  290. logger.warning("Snapshot does not appear to be JPEG")
  291. # Still return it - might be valid with different header
  292. return data
  293. except TimeoutError:
  294. logger.warning(f"Snapshot capture timed out after {timeout}s")
  295. return None
  296. except Exception as e:
  297. logger.error(f"Snapshot capture failed: {e}")
  298. return None
  299. async def test_connection(url: str, camera_type: str) -> dict:
  300. """Test camera connection.
  301. Returns:
  302. Dict with {success: bool, error?: str, resolution?: str}
  303. """
  304. print(f"[EXT-CAM] Testing camera connection: type={camera_type}, url={url[:50]}...")
  305. logger.info(f"Testing camera connection: type={camera_type}, url={url[:50]}...")
  306. try:
  307. frame = await capture_frame(url, camera_type, timeout=10)
  308. print(f"[EXT-CAM] Capture result: {len(frame) if frame else 0} bytes")
  309. logger.info(f"Capture result: {len(frame) if frame else 0} bytes")
  310. if frame:
  311. # Try to get resolution from JPEG header
  312. resolution = None
  313. try:
  314. # Simple JPEG dimension extraction
  315. # SOF0 marker is FF C0, followed by length, precision, height, width
  316. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  317. for marker in sof_markers:
  318. idx = frame.find(marker)
  319. if idx != -1 and idx + 9 <= len(frame):
  320. height = (frame[idx + 5] << 8) | frame[idx + 6]
  321. width = (frame[idx + 7] << 8) | frame[idx + 8]
  322. resolution = f"{width}x{height}"
  323. break
  324. except Exception:
  325. pass
  326. return {"success": True, "resolution": resolution}
  327. else:
  328. return {"success": False, "error": "Failed to capture frame from camera"}
  329. except Exception as e:
  330. # Sanitize error message - don't expose internal details
  331. error_type = type(e).__name__
  332. logger.error(f"Camera connection test failed: {e}")
  333. return {"success": False, "error": f"Connection failed: {error_type}"}
  334. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  335. """Generator yielding MJPEG frames for streaming.
  336. Args:
  337. url: Camera URL or USB device path
  338. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  339. fps: Target frames per second
  340. Yields:
  341. MJPEG frame data with HTTP multipart boundaries
  342. """
  343. frame_interval = 1.0 / max(fps, 1)
  344. last_frame_time = 0.0
  345. if camera_type == "mjpeg":
  346. # Proxy MJPEG stream directly
  347. async for frame in _stream_mjpeg(url):
  348. current_time = asyncio.get_event_loop().time()
  349. if current_time - last_frame_time >= frame_interval:
  350. last_frame_time = current_time
  351. yield _format_mjpeg_frame(frame)
  352. elif camera_type == "rtsp":
  353. # Use ffmpeg to convert RTSP to MJPEG
  354. async for frame in _stream_rtsp(url, fps):
  355. yield _format_mjpeg_frame(frame)
  356. elif camera_type == "usb":
  357. # Use ffmpeg to stream from USB camera
  358. async for frame in _stream_usb(url, fps):
  359. yield _format_mjpeg_frame(frame)
  360. elif camera_type == "snapshot":
  361. # Poll snapshot URL at interval
  362. while True:
  363. try:
  364. frame = await _capture_snapshot(url, timeout=10)
  365. if frame:
  366. yield _format_mjpeg_frame(frame)
  367. await asyncio.sleep(frame_interval)
  368. except asyncio.CancelledError:
  369. break
  370. except Exception as e:
  371. logger.warning(f"Snapshot poll failed: {e}")
  372. await asyncio.sleep(frame_interval)
  373. def _format_mjpeg_frame(frame: bytes) -> bytes:
  374. """Format frame for MJPEG HTTP response."""
  375. return (
  376. b"--frame\r\n"
  377. b"Content-Type: image/jpeg\r\n"
  378. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  379. b"\r\n" + frame + b"\r\n"
  380. )
  381. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  382. """Stream frames from MJPEG URL."""
  383. try:
  384. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  385. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(url) as response:
  386. if response.status != 200:
  387. logger.error(f"MJPEG stream returned status {response.status}")
  388. return
  389. buffer = b""
  390. jpeg_start = b"\xff\xd8"
  391. jpeg_end = b"\xff\xd9"
  392. async for chunk in response.content.iter_chunked(8192):
  393. buffer += chunk
  394. # Extract complete frames from buffer
  395. while True:
  396. start_idx = buffer.find(jpeg_start)
  397. if start_idx == -1:
  398. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  399. break
  400. if start_idx > 0:
  401. buffer = buffer[start_idx:]
  402. end_idx = buffer.find(jpeg_end, 2)
  403. if end_idx == -1:
  404. break
  405. frame = buffer[: end_idx + 2]
  406. buffer = buffer[end_idx + 2 :]
  407. yield frame
  408. except asyncio.CancelledError:
  409. logger.info("MJPEG stream cancelled")
  410. except Exception as e:
  411. logger.error(f"MJPEG stream error: {e}")
  412. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  413. """Stream frames from RTSP URL via ffmpeg."""
  414. ffmpeg = get_ffmpeg_path()
  415. if not ffmpeg:
  416. logger.error("ffmpeg not found - required for RTSP streaming")
  417. return
  418. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  419. cmd = [
  420. ffmpeg,
  421. "-rtsp_transport",
  422. "tcp",
  423. "-rtsp_flags",
  424. "prefer_tcp",
  425. "-timeout",
  426. "30000000",
  427. "-buffer_size",
  428. "1024000",
  429. "-max_delay",
  430. "500000",
  431. "-i",
  432. url,
  433. "-f",
  434. "mjpeg",
  435. "-q:v",
  436. "5",
  437. "-r",
  438. str(fps),
  439. "-an",
  440. "-",
  441. ]
  442. process = None
  443. try:
  444. process = await asyncio.create_subprocess_exec(
  445. *cmd,
  446. stdout=asyncio.subprocess.PIPE,
  447. stderr=asyncio.subprocess.PIPE,
  448. )
  449. # Give ffmpeg a moment to start and check for immediate failures
  450. await asyncio.sleep(0.5)
  451. if process.returncode is not None:
  452. stderr = await process.stderr.read()
  453. logger.error(f"ffmpeg RTSP stream failed immediately: {stderr.decode()[:300]}")
  454. return
  455. buffer = b""
  456. jpeg_start = b"\xff\xd8"
  457. jpeg_end = b"\xff\xd9"
  458. while True:
  459. try:
  460. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  461. if not chunk:
  462. break
  463. buffer += chunk
  464. # Extract complete frames
  465. while True:
  466. start_idx = buffer.find(jpeg_start)
  467. if start_idx == -1:
  468. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  469. break
  470. if start_idx > 0:
  471. buffer = buffer[start_idx:]
  472. end_idx = buffer.find(jpeg_end, 2)
  473. if end_idx == -1:
  474. break
  475. frame = buffer[: end_idx + 2]
  476. buffer = buffer[end_idx + 2 :]
  477. yield frame
  478. except TimeoutError:
  479. logger.warning("RTSP stream read timeout")
  480. break
  481. except asyncio.CancelledError:
  482. logger.info("RTSP stream cancelled")
  483. except Exception as e:
  484. logger.error(f"RTSP stream error: {e}")
  485. finally:
  486. if process and process.returncode is None:
  487. process.terminate()
  488. try:
  489. await asyncio.wait_for(process.wait(), timeout=2.0)
  490. except TimeoutError:
  491. process.kill()
  492. await process.wait()
  493. async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
  494. """Stream frames from USB camera via ffmpeg."""
  495. ffmpeg = get_ffmpeg_path()
  496. if not ffmpeg:
  497. logger.error("ffmpeg not found - required for USB camera streaming")
  498. return
  499. # Validate device path
  500. if not device.startswith("/dev/video"):
  501. logger.error(f"Invalid USB device path: {device}")
  502. return
  503. if not Path(device).exists():
  504. logger.error(f"USB device does not exist: {device}")
  505. return
  506. # ffmpeg command to stream from USB camera (v4l2)
  507. cmd = [
  508. ffmpeg,
  509. "-f",
  510. "v4l2",
  511. "-framerate",
  512. str(fps),
  513. "-i",
  514. device,
  515. "-f",
  516. "mjpeg",
  517. "-q:v",
  518. "5",
  519. "-r",
  520. str(fps),
  521. "-",
  522. ]
  523. process = None
  524. try:
  525. logger.info(f"Starting USB camera stream from {device} at {fps} fps")
  526. process = await asyncio.create_subprocess_exec(
  527. *cmd,
  528. stdout=asyncio.subprocess.PIPE,
  529. stderr=asyncio.subprocess.PIPE,
  530. )
  531. # Give ffmpeg a moment to start and check for immediate failures
  532. await asyncio.sleep(0.5)
  533. if process.returncode is not None:
  534. stderr = await process.stderr.read()
  535. logger.error(f"ffmpeg USB stream failed immediately: {stderr.decode()[:300]}")
  536. return
  537. buffer = b""
  538. jpeg_start = b"\xff\xd8"
  539. jpeg_end = b"\xff\xd9"
  540. while True:
  541. try:
  542. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  543. if not chunk:
  544. break
  545. buffer += chunk
  546. # Extract complete frames
  547. while True:
  548. start_idx = buffer.find(jpeg_start)
  549. if start_idx == -1:
  550. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  551. break
  552. if start_idx > 0:
  553. buffer = buffer[start_idx:]
  554. end_idx = buffer.find(jpeg_end, 2)
  555. if end_idx == -1:
  556. break
  557. frame = buffer[: end_idx + 2]
  558. buffer = buffer[end_idx + 2 :]
  559. yield frame
  560. except TimeoutError:
  561. logger.warning("USB stream read timeout")
  562. break
  563. except asyncio.CancelledError:
  564. logger.info("USB stream cancelled")
  565. except Exception as e:
  566. logger.error(f"USB stream error: {e}")
  567. finally:
  568. if process and process.returncode is None:
  569. process.terminate()
  570. try:
  571. await asyncio.wait_for(process.wait(), timeout=2.0)
  572. except TimeoutError:
  573. process.kill()
  574. await process.wait()