external_camera.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. """External network camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), and HTTP snapshot URLs.
  3. """
  4. import asyncio
  5. import logging
  6. import shutil
  7. from collections.abc import AsyncGenerator
  8. from pathlib import Path
  9. import aiohttp
  10. logger = logging.getLogger(__name__)
  11. def get_ffmpeg_path() -> str | None:
  12. """Get the path to ffmpeg executable."""
  13. # Try shutil.which first
  14. path = shutil.which("ffmpeg")
  15. if path:
  16. return path
  17. # Check common locations (systemd services may have limited PATH)
  18. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  19. if Path(common_path).exists():
  20. return common_path
  21. return None
  22. async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
  23. """Capture single frame from external camera.
  24. Args:
  25. url: Camera URL (MJPEG stream, RTSP URL, or HTTP snapshot URL)
  26. camera_type: "mjpeg", "rtsp", or "snapshot"
  27. timeout: Connection timeout in seconds
  28. Returns:
  29. JPEG bytes or None on failure
  30. """
  31. logger.debug(f"capture_frame called: type={camera_type}, url={url[:50] if url else 'None'}...")
  32. if camera_type == "mjpeg":
  33. return await _capture_mjpeg_frame(url, timeout)
  34. elif camera_type == "rtsp":
  35. return await _capture_rtsp_frame(url, timeout)
  36. elif camera_type == "snapshot":
  37. return await _capture_snapshot(url, timeout)
  38. else:
  39. logger.warning(f"Unknown camera type: {camera_type}")
  40. return None
  41. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  42. """Extract single frame from MJPEG stream."""
  43. try:
  44. async with (
  45. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  46. session.get(url) as response,
  47. ):
  48. if response.status != 200:
  49. logger.error(f"MJPEG stream returned status {response.status}")
  50. return None
  51. # Read chunks until we find a complete JPEG frame
  52. buffer = b""
  53. jpeg_start = b"\xff\xd8"
  54. jpeg_end = b"\xff\xd9"
  55. async for chunk in response.content.iter_chunked(8192):
  56. buffer += chunk
  57. # Look for complete JPEG frame
  58. start_idx = buffer.find(jpeg_start)
  59. if start_idx == -1:
  60. continue
  61. end_idx = buffer.find(jpeg_end, start_idx + 2)
  62. if end_idx != -1:
  63. # Found complete frame
  64. frame = buffer[start_idx : end_idx + 2]
  65. return frame
  66. # Keep searching, but limit buffer size
  67. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  68. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  69. return None
  70. except TimeoutError:
  71. logger.warning(f"MJPEG frame capture timed out after {timeout}s")
  72. return None
  73. except Exception as e:
  74. logger.error(f"MJPEG frame capture failed: {e}")
  75. return None
  76. return None
  77. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  78. """Capture frame from RTSP using ffmpeg."""
  79. ffmpeg = get_ffmpeg_path()
  80. if not ffmpeg:
  81. logger.error("ffmpeg not found - required for RTSP capture")
  82. return None
  83. # Use ffmpeg to grab a single frame from RTSP stream
  84. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  85. cmd = [
  86. ffmpeg,
  87. "-rtsp_transport",
  88. "tcp",
  89. "-i",
  90. url,
  91. "-frames:v",
  92. "1",
  93. "-f",
  94. "image2pipe",
  95. "-vcodec",
  96. "mjpeg",
  97. "-q:v",
  98. "2",
  99. "-",
  100. ]
  101. try:
  102. print(f"[EXT-CAM] Running ffmpeg command: {' '.join(cmd[:6])}...")
  103. process = await asyncio.create_subprocess_exec(
  104. *cmd,
  105. stdout=asyncio.subprocess.PIPE,
  106. stderr=asyncio.subprocess.PIPE,
  107. )
  108. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  109. print(
  110. f"[EXT-CAM] ffmpeg returned: code={process.returncode}, stdout={len(stdout)} bytes, stderr={len(stderr)} bytes"
  111. )
  112. if process.returncode != 0:
  113. logger.error(f"ffmpeg RTSP capture failed: {stderr.decode()[:200]}")
  114. print(f"[EXT-CAM] ffmpeg error: {stderr.decode()[:300]}")
  115. return None
  116. if not stdout or len(stdout) < 100:
  117. logger.error("ffmpeg returned empty or too small frame")
  118. return None
  119. return stdout
  120. except TimeoutError:
  121. logger.warning(f"RTSP frame capture timed out after {timeout}s")
  122. if process:
  123. process.kill()
  124. return None
  125. except Exception as e:
  126. logger.error(f"RTSP frame capture failed: {e}")
  127. return None
  128. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  129. """Fetch snapshot from HTTP URL."""
  130. try:
  131. async with (
  132. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  133. session.get(url) as response,
  134. ):
  135. if response.status != 200:
  136. logger.error(f"Snapshot URL returned status {response.status}")
  137. return None
  138. data = await response.read()
  139. # Validate it looks like JPEG
  140. if not data.startswith(b"\xff\xd8"):
  141. logger.warning("Snapshot does not appear to be JPEG")
  142. # Still return it - might be valid with different header
  143. return data
  144. except TimeoutError:
  145. logger.warning(f"Snapshot capture timed out after {timeout}s")
  146. return None
  147. except Exception as e:
  148. logger.error(f"Snapshot capture failed: {e}")
  149. return None
  150. async def test_connection(url: str, camera_type: str) -> dict:
  151. """Test camera connection.
  152. Returns:
  153. Dict with {success: bool, error?: str, resolution?: str}
  154. """
  155. print(f"[EXT-CAM] Testing camera connection: type={camera_type}, url={url[:50]}...")
  156. logger.info(f"Testing camera connection: type={camera_type}, url={url[:50]}...")
  157. try:
  158. frame = await capture_frame(url, camera_type, timeout=10)
  159. print(f"[EXT-CAM] Capture result: {len(frame) if frame else 0} bytes")
  160. logger.info(f"Capture result: {len(frame) if frame else 0} bytes")
  161. if frame:
  162. # Try to get resolution from JPEG header
  163. resolution = None
  164. try:
  165. # Simple JPEG dimension extraction
  166. # SOF0 marker is FF C0, followed by length, precision, height, width
  167. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  168. for marker in sof_markers:
  169. idx = frame.find(marker)
  170. if idx != -1 and idx + 9 <= len(frame):
  171. height = (frame[idx + 5] << 8) | frame[idx + 6]
  172. width = (frame[idx + 7] << 8) | frame[idx + 8]
  173. resolution = f"{width}x{height}"
  174. break
  175. except Exception:
  176. pass
  177. return {"success": True, "resolution": resolution}
  178. else:
  179. return {"success": False, "error": "Failed to capture frame from camera"}
  180. except Exception as e:
  181. return {"success": False, "error": str(e)}
  182. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  183. """Generator yielding MJPEG frames for streaming.
  184. Args:
  185. url: Camera URL
  186. camera_type: "mjpeg", "rtsp", or "snapshot"
  187. fps: Target frames per second
  188. Yields:
  189. MJPEG frame data with HTTP multipart boundaries
  190. """
  191. frame_interval = 1.0 / max(fps, 1)
  192. last_frame_time = 0.0
  193. if camera_type == "mjpeg":
  194. # Proxy MJPEG stream directly
  195. async for frame in _stream_mjpeg(url):
  196. current_time = asyncio.get_event_loop().time()
  197. if current_time - last_frame_time >= frame_interval:
  198. last_frame_time = current_time
  199. yield _format_mjpeg_frame(frame)
  200. elif camera_type == "rtsp":
  201. # Use ffmpeg to convert RTSP to MJPEG
  202. async for frame in _stream_rtsp(url, fps):
  203. yield _format_mjpeg_frame(frame)
  204. elif camera_type == "snapshot":
  205. # Poll snapshot URL at interval
  206. while True:
  207. try:
  208. frame = await _capture_snapshot(url, timeout=10)
  209. if frame:
  210. yield _format_mjpeg_frame(frame)
  211. await asyncio.sleep(frame_interval)
  212. except asyncio.CancelledError:
  213. break
  214. except Exception as e:
  215. logger.warning(f"Snapshot poll failed: {e}")
  216. await asyncio.sleep(frame_interval)
  217. def _format_mjpeg_frame(frame: bytes) -> bytes:
  218. """Format frame for MJPEG HTTP response."""
  219. return (
  220. b"--frame\r\n"
  221. b"Content-Type: image/jpeg\r\n"
  222. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  223. b"\r\n" + frame + b"\r\n"
  224. )
  225. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  226. """Stream frames from MJPEG URL."""
  227. try:
  228. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  229. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(url) as response:
  230. if response.status != 200:
  231. logger.error(f"MJPEG stream returned status {response.status}")
  232. return
  233. buffer = b""
  234. jpeg_start = b"\xff\xd8"
  235. jpeg_end = b"\xff\xd9"
  236. async for chunk in response.content.iter_chunked(8192):
  237. buffer += chunk
  238. # Extract complete frames from buffer
  239. while True:
  240. start_idx = buffer.find(jpeg_start)
  241. if start_idx == -1:
  242. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  243. break
  244. if start_idx > 0:
  245. buffer = buffer[start_idx:]
  246. end_idx = buffer.find(jpeg_end, 2)
  247. if end_idx == -1:
  248. break
  249. frame = buffer[: end_idx + 2]
  250. buffer = buffer[end_idx + 2 :]
  251. yield frame
  252. except asyncio.CancelledError:
  253. logger.info("MJPEG stream cancelled")
  254. except Exception as e:
  255. logger.error(f"MJPEG stream error: {e}")
  256. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  257. """Stream frames from RTSP URL via ffmpeg."""
  258. ffmpeg = get_ffmpeg_path()
  259. if not ffmpeg:
  260. logger.error("ffmpeg not found - required for RTSP streaming")
  261. return
  262. # ffmpeg handles both rtsp:// and rtsps:// URLs automatically
  263. cmd = [
  264. ffmpeg,
  265. "-rtsp_transport",
  266. "tcp",
  267. "-rtsp_flags",
  268. "prefer_tcp",
  269. "-timeout",
  270. "30000000",
  271. "-buffer_size",
  272. "1024000",
  273. "-max_delay",
  274. "500000",
  275. "-i",
  276. url,
  277. "-f",
  278. "mjpeg",
  279. "-q:v",
  280. "5",
  281. "-r",
  282. str(fps),
  283. "-an",
  284. "-",
  285. ]
  286. process = None
  287. try:
  288. process = await asyncio.create_subprocess_exec(
  289. *cmd,
  290. stdout=asyncio.subprocess.PIPE,
  291. stderr=asyncio.subprocess.PIPE,
  292. )
  293. # Give ffmpeg a moment to start and check for immediate failures
  294. await asyncio.sleep(0.5)
  295. if process.returncode is not None:
  296. stderr = await process.stderr.read()
  297. logger.error(f"ffmpeg RTSP stream failed immediately: {stderr.decode()[:300]}")
  298. return
  299. buffer = b""
  300. jpeg_start = b"\xff\xd8"
  301. jpeg_end = b"\xff\xd9"
  302. while True:
  303. try:
  304. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  305. if not chunk:
  306. break
  307. buffer += chunk
  308. # Extract complete frames
  309. while True:
  310. start_idx = buffer.find(jpeg_start)
  311. if start_idx == -1:
  312. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  313. break
  314. if start_idx > 0:
  315. buffer = buffer[start_idx:]
  316. end_idx = buffer.find(jpeg_end, 2)
  317. if end_idx == -1:
  318. break
  319. frame = buffer[: end_idx + 2]
  320. buffer = buffer[end_idx + 2 :]
  321. yield frame
  322. except TimeoutError:
  323. logger.warning("RTSP stream read timeout")
  324. break
  325. except asyncio.CancelledError:
  326. logger.info("RTSP stream cancelled")
  327. except Exception as e:
  328. logger.error(f"RTSP stream error: {e}")
  329. finally:
  330. if process and process.returncode is None:
  331. process.terminate()
  332. try:
  333. await asyncio.wait_for(process.wait(), timeout=2.0)
  334. except TimeoutError:
  335. process.kill()
  336. await process.wait()