external_camera.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643
  1. """External camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
  3. """
  4. import asyncio
  5. import logging
  6. import re
  7. import shutil
  8. from collections.abc import AsyncGenerator
  9. from pathlib import Path
  10. import aiohttp
  11. logger = logging.getLogger(__name__)
  12. def list_usb_cameras() -> list[dict]:
  13. """List available USB cameras (V4L2 devices on Linux).
  14. Returns:
  15. List of dicts with {device: str, name: str, capabilities: list}
  16. """
  17. cameras = []
  18. video_devices = sorted(Path("/dev").glob("video*"))
  19. for device in video_devices:
  20. device_path = str(device)
  21. info = {"device": device_path, "name": device.name, "capabilities": []}
  22. # Try to get device info via v4l2-ctl
  23. v4l2_ctl = shutil.which("v4l2-ctl")
  24. if v4l2_ctl:
  25. import subprocess
  26. try:
  27. result = subprocess.run(
  28. [v4l2_ctl, "-d", device_path, "--info"],
  29. capture_output=True,
  30. text=True,
  31. timeout=5,
  32. )
  33. if result.returncode == 0:
  34. # Parse device name from output
  35. for line in result.stdout.splitlines():
  36. if "Card type" in line:
  37. info["name"] = line.split(":", 1)[1].strip()
  38. elif "Driver name" in line:
  39. info["driver"] = line.split(":", 1)[1].strip()
  40. # Check if device supports video capture
  41. result = subprocess.run(
  42. [v4l2_ctl, "-d", device_path, "--list-formats"],
  43. capture_output=True,
  44. text=True,
  45. timeout=5,
  46. )
  47. if result.returncode == 0 and result.stdout.strip():
  48. info["capabilities"].append("capture")
  49. # Parse available formats
  50. formats = re.findall(r"'(\w+)'", result.stdout)
  51. info["formats"] = list(set(formats))
  52. except (subprocess.TimeoutExpired, Exception) as e:
  53. logger.debug(f"v4l2-ctl failed for {device_path}: {e}")
  54. # Only include devices that look like video capture devices
  55. # Skip metadata devices (typically odd numbered like video1, video3)
  56. try:
  57. device_num = int(device.name.replace("video", ""))
  58. # Even numbered devices are usually capture, odd are metadata
  59. # But also check if we got capabilities
  60. if info.get("capabilities") or device_num % 2 == 0:
  61. cameras.append(info)
  62. except ValueError:
  63. cameras.append(info)
  64. return cameras
  65. def get_ffmpeg_path() -> str | None:
  66. """Get the path to ffmpeg executable."""
  67. # Try shutil.which first
  68. path = shutil.which("ffmpeg")
  69. if path:
  70. return path
  71. # Check common locations (systemd services may have limited PATH)
  72. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  73. if Path(common_path).exists():
  74. return common_path
  75. return None
  76. async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
  77. """Capture single frame from external camera.
  78. Args:
  79. url: Camera URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path)
  80. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  81. timeout: Connection timeout in seconds
  82. Returns:
  83. JPEG bytes or None on failure
  84. """
  85. logger.debug(f"capture_frame called: type={camera_type}, url={url[:50] if url else 'None'}...")
  86. if camera_type == "mjpeg":
  87. return await _capture_mjpeg_frame(url, timeout)
  88. elif camera_type == "rtsp":
  89. return await _capture_rtsp_frame(url, timeout)
  90. elif camera_type == "snapshot":
  91. return await _capture_snapshot(url, timeout)
  92. elif camera_type == "usb":
  93. return await _capture_usb_frame(url, timeout)
  94. else:
  95. logger.warning(f"Unknown camera type: {camera_type}")
  96. return None
  97. async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
  98. """Capture frame from USB camera using ffmpeg."""
  99. ffmpeg = get_ffmpeg_path()
  100. if not ffmpeg:
  101. logger.error("ffmpeg not found - required for USB camera capture")
  102. return None
  103. # Validate device path
  104. if not device.startswith("/dev/video"):
  105. logger.error(f"Invalid USB device path: {device}")
  106. return None
  107. if not Path(device).exists():
  108. logger.error(f"USB device does not exist: {device}")
  109. return None
  110. # Use ffmpeg to grab a single frame from USB camera
  111. cmd = [
  112. ffmpeg,
  113. "-f",
  114. "v4l2",
  115. "-i",
  116. device,
  117. "-frames:v",
  118. "1",
  119. "-f",
  120. "image2pipe",
  121. "-vcodec",
  122. "mjpeg",
  123. "-q:v",
  124. "2",
  125. "-",
  126. ]
  127. try:
  128. logger.debug(f"Running USB capture: {' '.join(cmd)}")
  129. process = await asyncio.create_subprocess_exec(
  130. *cmd,
  131. stdout=asyncio.subprocess.PIPE,
  132. stderr=asyncio.subprocess.PIPE,
  133. )
  134. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  135. if process.returncode != 0:
  136. logger.error(f"ffmpeg USB capture failed: {stderr.decode()[:200]}")
  137. return None
  138. if not stdout or len(stdout) < 100:
  139. logger.error("ffmpeg returned empty or too small frame from USB camera")
  140. return None
  141. return stdout
  142. except TimeoutError:
  143. logger.warning(f"USB frame capture timed out after {timeout}s")
  144. if process:
  145. process.kill()
  146. return None
  147. except Exception as e:
  148. logger.error(f"USB frame capture failed: {e}")
  149. return None
  150. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  151. """Extract single frame from MJPEG stream."""
  152. try:
  153. async with (
  154. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  155. session.get(url) as response,
  156. ):
  157. if response.status != 200:
  158. logger.error(f"MJPEG stream returned status {response.status}")
  159. return None
  160. # Read chunks until we find a complete JPEG frame
  161. buffer = b""
  162. jpeg_start = b"\xff\xd8"
  163. jpeg_end = b"\xff\xd9"
  164. async for chunk in response.content.iter_chunked(8192):
  165. buffer += chunk
  166. # Look for complete JPEG frame
  167. start_idx = buffer.find(jpeg_start)
  168. if start_idx == -1:
  169. continue
  170. end_idx = buffer.find(jpeg_end, start_idx + 2)
  171. if end_idx != -1:
  172. # Found complete frame
  173. frame = buffer[start_idx : end_idx + 2]
  174. return frame
  175. # Keep searching, but limit buffer size
  176. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  177. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  178. return None
  179. except TimeoutError:
  180. logger.warning(f"MJPEG frame capture timed out after {timeout}s")
  181. return None
  182. except Exception as e:
  183. logger.error(f"MJPEG frame capture failed: {e}")
  184. return None
  185. return None
  186. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  187. """Capture frame from RTSP using ffmpeg."""
  188. ffmpeg = get_ffmpeg_path()
  189. if not ffmpeg:
  190. logger.error("ffmpeg not found - required for RTSP capture")
  191. return None
  192. # Use ffmpeg to grab a single frame from RTSP stream
  193. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  194. cmd = [
  195. ffmpeg,
  196. "-rtsp_transport",
  197. "tcp",
  198. "-i",
  199. url,
  200. "-frames:v",
  201. "1",
  202. "-f",
  203. "image2pipe",
  204. "-vcodec",
  205. "mjpeg",
  206. "-q:v",
  207. "2",
  208. "-",
  209. ]
  210. try:
  211. print(f"[EXT-CAM] Running ffmpeg command: {' '.join(cmd[:6])}...")
  212. process = await asyncio.create_subprocess_exec(
  213. *cmd,
  214. stdout=asyncio.subprocess.PIPE,
  215. stderr=asyncio.subprocess.PIPE,
  216. )
  217. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  218. print(
  219. f"[EXT-CAM] ffmpeg returned: code={process.returncode}, stdout={len(stdout)} bytes, stderr={len(stderr)} bytes"
  220. )
  221. if process.returncode != 0:
  222. logger.error(f"ffmpeg RTSP capture failed: {stderr.decode()[:200]}")
  223. print(f"[EXT-CAM] ffmpeg error: {stderr.decode()[:300]}")
  224. return None
  225. if not stdout or len(stdout) < 100:
  226. logger.error("ffmpeg returned empty or too small frame")
  227. return None
  228. return stdout
  229. except TimeoutError:
  230. logger.warning(f"RTSP frame capture timed out after {timeout}s")
  231. if process:
  232. process.kill()
  233. return None
  234. except Exception as e:
  235. logger.error(f"RTSP frame capture failed: {e}")
  236. return None
  237. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  238. """Fetch snapshot from HTTP URL."""
  239. try:
  240. async with (
  241. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  242. session.get(url) as response,
  243. ):
  244. if response.status != 200:
  245. logger.error(f"Snapshot URL returned status {response.status}")
  246. return None
  247. data = await response.read()
  248. # Validate it looks like JPEG
  249. if not data.startswith(b"\xff\xd8"):
  250. logger.warning("Snapshot does not appear to be JPEG")
  251. # Still return it - might be valid with different header
  252. return data
  253. except TimeoutError:
  254. logger.warning(f"Snapshot capture timed out after {timeout}s")
  255. return None
  256. except Exception as e:
  257. logger.error(f"Snapshot capture failed: {e}")
  258. return None
  259. async def test_connection(url: str, camera_type: str) -> dict:
  260. """Test camera connection.
  261. Returns:
  262. Dict with {success: bool, error?: str, resolution?: str}
  263. """
  264. print(f"[EXT-CAM] Testing camera connection: type={camera_type}, url={url[:50]}...")
  265. logger.info(f"Testing camera connection: type={camera_type}, url={url[:50]}...")
  266. try:
  267. frame = await capture_frame(url, camera_type, timeout=10)
  268. print(f"[EXT-CAM] Capture result: {len(frame) if frame else 0} bytes")
  269. logger.info(f"Capture result: {len(frame) if frame else 0} bytes")
  270. if frame:
  271. # Try to get resolution from JPEG header
  272. resolution = None
  273. try:
  274. # Simple JPEG dimension extraction
  275. # SOF0 marker is FF C0, followed by length, precision, height, width
  276. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  277. for marker in sof_markers:
  278. idx = frame.find(marker)
  279. if idx != -1 and idx + 9 <= len(frame):
  280. height = (frame[idx + 5] << 8) | frame[idx + 6]
  281. width = (frame[idx + 7] << 8) | frame[idx + 8]
  282. resolution = f"{width}x{height}"
  283. break
  284. except Exception:
  285. pass
  286. return {"success": True, "resolution": resolution}
  287. else:
  288. return {"success": False, "error": "Failed to capture frame from camera"}
  289. except Exception as e:
  290. return {"success": False, "error": str(e)}
  291. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  292. """Generator yielding MJPEG frames for streaming.
  293. Args:
  294. url: Camera URL or USB device path
  295. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  296. fps: Target frames per second
  297. Yields:
  298. MJPEG frame data with HTTP multipart boundaries
  299. """
  300. frame_interval = 1.0 / max(fps, 1)
  301. last_frame_time = 0.0
  302. if camera_type == "mjpeg":
  303. # Proxy MJPEG stream directly
  304. async for frame in _stream_mjpeg(url):
  305. current_time = asyncio.get_event_loop().time()
  306. if current_time - last_frame_time >= frame_interval:
  307. last_frame_time = current_time
  308. yield _format_mjpeg_frame(frame)
  309. elif camera_type == "rtsp":
  310. # Use ffmpeg to convert RTSP to MJPEG
  311. async for frame in _stream_rtsp(url, fps):
  312. yield _format_mjpeg_frame(frame)
  313. elif camera_type == "usb":
  314. # Use ffmpeg to stream from USB camera
  315. async for frame in _stream_usb(url, fps):
  316. yield _format_mjpeg_frame(frame)
  317. elif camera_type == "snapshot":
  318. # Poll snapshot URL at interval
  319. while True:
  320. try:
  321. frame = await _capture_snapshot(url, timeout=10)
  322. if frame:
  323. yield _format_mjpeg_frame(frame)
  324. await asyncio.sleep(frame_interval)
  325. except asyncio.CancelledError:
  326. break
  327. except Exception as e:
  328. logger.warning(f"Snapshot poll failed: {e}")
  329. await asyncio.sleep(frame_interval)
  330. def _format_mjpeg_frame(frame: bytes) -> bytes:
  331. """Format frame for MJPEG HTTP response."""
  332. return (
  333. b"--frame\r\n"
  334. b"Content-Type: image/jpeg\r\n"
  335. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  336. b"\r\n" + frame + b"\r\n"
  337. )
  338. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  339. """Stream frames from MJPEG URL."""
  340. try:
  341. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  342. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(url) as response:
  343. if response.status != 200:
  344. logger.error(f"MJPEG stream returned status {response.status}")
  345. return
  346. buffer = b""
  347. jpeg_start = b"\xff\xd8"
  348. jpeg_end = b"\xff\xd9"
  349. async for chunk in response.content.iter_chunked(8192):
  350. buffer += chunk
  351. # Extract complete frames from buffer
  352. while True:
  353. start_idx = buffer.find(jpeg_start)
  354. if start_idx == -1:
  355. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  356. break
  357. if start_idx > 0:
  358. buffer = buffer[start_idx:]
  359. end_idx = buffer.find(jpeg_end, 2)
  360. if end_idx == -1:
  361. break
  362. frame = buffer[: end_idx + 2]
  363. buffer = buffer[end_idx + 2 :]
  364. yield frame
  365. except asyncio.CancelledError:
  366. logger.info("MJPEG stream cancelled")
  367. except Exception as e:
  368. logger.error(f"MJPEG stream error: {e}")
  369. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  370. """Stream frames from RTSP URL via ffmpeg."""
  371. ffmpeg = get_ffmpeg_path()
  372. if not ffmpeg:
  373. logger.error("ffmpeg not found - required for RTSP streaming")
  374. return
  375. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  376. cmd = [
  377. ffmpeg,
  378. "-rtsp_transport",
  379. "tcp",
  380. "-rtsp_flags",
  381. "prefer_tcp",
  382. "-timeout",
  383. "30000000",
  384. "-buffer_size",
  385. "1024000",
  386. "-max_delay",
  387. "500000",
  388. "-i",
  389. url,
  390. "-f",
  391. "mjpeg",
  392. "-q:v",
  393. "5",
  394. "-r",
  395. str(fps),
  396. "-an",
  397. "-",
  398. ]
  399. process = None
  400. try:
  401. process = await asyncio.create_subprocess_exec(
  402. *cmd,
  403. stdout=asyncio.subprocess.PIPE,
  404. stderr=asyncio.subprocess.PIPE,
  405. )
  406. # Give ffmpeg a moment to start and check for immediate failures
  407. await asyncio.sleep(0.5)
  408. if process.returncode is not None:
  409. stderr = await process.stderr.read()
  410. logger.error(f"ffmpeg RTSP stream failed immediately: {stderr.decode()[:300]}")
  411. return
  412. buffer = b""
  413. jpeg_start = b"\xff\xd8"
  414. jpeg_end = b"\xff\xd9"
  415. while True:
  416. try:
  417. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  418. if not chunk:
  419. break
  420. buffer += chunk
  421. # Extract complete frames
  422. while True:
  423. start_idx = buffer.find(jpeg_start)
  424. if start_idx == -1:
  425. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  426. break
  427. if start_idx > 0:
  428. buffer = buffer[start_idx:]
  429. end_idx = buffer.find(jpeg_end, 2)
  430. if end_idx == -1:
  431. break
  432. frame = buffer[: end_idx + 2]
  433. buffer = buffer[end_idx + 2 :]
  434. yield frame
  435. except TimeoutError:
  436. logger.warning("RTSP stream read timeout")
  437. break
  438. except asyncio.CancelledError:
  439. logger.info("RTSP stream cancelled")
  440. except Exception as e:
  441. logger.error(f"RTSP stream error: {e}")
  442. finally:
  443. if process and process.returncode is None:
  444. process.terminate()
  445. try:
  446. await asyncio.wait_for(process.wait(), timeout=2.0)
  447. except TimeoutError:
  448. process.kill()
  449. await process.wait()
  450. async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
  451. """Stream frames from USB camera via ffmpeg."""
  452. ffmpeg = get_ffmpeg_path()
  453. if not ffmpeg:
  454. logger.error("ffmpeg not found - required for USB camera streaming")
  455. return
  456. # Validate device path
  457. if not device.startswith("/dev/video"):
  458. logger.error(f"Invalid USB device path: {device}")
  459. return
  460. if not Path(device).exists():
  461. logger.error(f"USB device does not exist: {device}")
  462. return
  463. # ffmpeg command to stream from USB camera (v4l2)
  464. cmd = [
  465. ffmpeg,
  466. "-f",
  467. "v4l2",
  468. "-framerate",
  469. str(fps),
  470. "-i",
  471. device,
  472. "-f",
  473. "mjpeg",
  474. "-q:v",
  475. "5",
  476. "-r",
  477. str(fps),
  478. "-",
  479. ]
  480. process = None
  481. try:
  482. logger.info(f"Starting USB camera stream from {device} at {fps} fps")
  483. process = await asyncio.create_subprocess_exec(
  484. *cmd,
  485. stdout=asyncio.subprocess.PIPE,
  486. stderr=asyncio.subprocess.PIPE,
  487. )
  488. # Give ffmpeg a moment to start and check for immediate failures
  489. await asyncio.sleep(0.5)
  490. if process.returncode is not None:
  491. stderr = await process.stderr.read()
  492. logger.error(f"ffmpeg USB stream failed immediately: {stderr.decode()[:300]}")
  493. return
  494. buffer = b""
  495. jpeg_start = b"\xff\xd8"
  496. jpeg_end = b"\xff\xd9"
  497. while True:
  498. try:
  499. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  500. if not chunk:
  501. break
  502. buffer += chunk
  503. # Extract complete frames
  504. while True:
  505. start_idx = buffer.find(jpeg_start)
  506. if start_idx == -1:
  507. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  508. break
  509. if start_idx > 0:
  510. buffer = buffer[start_idx:]
  511. end_idx = buffer.find(jpeg_end, 2)
  512. if end_idx == -1:
  513. break
  514. frame = buffer[: end_idx + 2]
  515. buffer = buffer[end_idx + 2 :]
  516. yield frame
  517. except TimeoutError:
  518. logger.warning("USB stream read timeout")
  519. break
  520. except asyncio.CancelledError:
  521. logger.info("USB stream cancelled")
  522. except Exception as e:
  523. logger.error(f"USB stream error: {e}")
  524. finally:
  525. if process and process.returncode is None:
  526. process.terminate()
  527. try:
  528. await asyncio.wait_for(process.wait(), timeout=2.0)
  529. except TimeoutError:
  530. process.kill()
  531. await process.wait()