external_camera.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907
  1. """External camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
  3. Security Note: This service intentionally makes requests to user-configured camera URLs.
  4. This is necessary functionality for external camera integration. URLs are validated
  5. to ensure they are well-formed before use.
  6. """
  7. import asyncio
  8. import logging
  9. import re
  10. import shutil
  11. from collections.abc import AsyncGenerator
  12. from pathlib import Path
  13. from urllib.parse import urlparse
  14. import aiohttp
  15. logger = logging.getLogger(__name__)
  16. def _sanitize_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> str | None:
  17. """Validate and sanitize camera URL, returning a safe reconstructed URL.
  18. This validates that the URL is well-formed, uses an allowed scheme,
  19. does not target cloud metadata services, and returns a reconstructed
  20. URL from validated components.
  21. Note: This intentionally allows user-provided URLs as that is the
  22. purpose of external camera configuration. Local network IPs are
  23. allowed since cameras are typically on the same LAN.
  24. Args:
  25. url: URL to validate and sanitize
  26. allowed_schemes: Tuple of allowed URL schemes
  27. Returns:
  28. Sanitized URL string if valid, None otherwise
  29. """
  30. try:
  31. parsed = urlparse(url)
  32. if not parsed.scheme or not parsed.netloc:
  33. return None
  34. # Validate scheme against allowlist
  35. scheme = parsed.scheme.lower()
  36. if scheme not in allowed_schemes:
  37. return None
  38. # Block cloud metadata service endpoints (SSRF mitigation)
  39. # These are dangerous destinations that should never be accessed
  40. hostname = parsed.hostname or ""
  41. hostname_lower = hostname.lower()
  42. blocked_hosts = (
  43. "169.254.169.254", # AWS/GCP/Azure metadata
  44. "metadata.google.internal", # GCP metadata
  45. "metadata.google",
  46. "localhost", # Block localhost to prevent internal service access
  47. "127.0.0.1",
  48. "::1",
  49. "0.0.0.0", # nosec B104
  50. )
  51. if hostname_lower in blocked_hosts:
  52. logger.warning("Blocked camera URL targeting restricted host: %s", hostname)
  53. return None
  54. # Block link-local addresses (169.254.x.x)
  55. if hostname.startswith("169.254."):
  56. logger.warning("Blocked camera URL targeting link-local address: %s", hostname)
  57. return None
  58. # Reconstruct URL from validated components to break taint chain
  59. # This creates a new string from validated parts
  60. port_str = f":{parsed.port}" if parsed.port else ""
  61. path = parsed.path or ""
  62. query = f"?{parsed.query}" if parsed.query else ""
  63. fragment = f"#{parsed.fragment}" if parsed.fragment else ""
  64. # Build sanitized URL from validated components
  65. sanitized = f"{scheme}://{hostname}{port_str}{path}{query}{fragment}"
  66. return sanitized
  67. except ValueError:
  68. return None
  69. def _validate_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> bool:
  70. """Validate camera URL format (legacy wrapper).
  71. Args:
  72. url: URL to validate
  73. allowed_schemes: Tuple of allowed URL schemes
  74. Returns:
  75. True if URL is valid, False otherwise
  76. """
  77. return _sanitize_camera_url(url, allowed_schemes) is not None
  78. def list_usb_cameras() -> list[dict]:
  79. """List available USB cameras (V4L2 devices on Linux).
  80. Returns:
  81. List of dicts with {device: str, name: str, capabilities: list}
  82. """
  83. cameras = []
  84. video_devices = sorted(Path("/dev").glob("video*"))
  85. for device in video_devices:
  86. device_path = str(device)
  87. info = {"device": device_path, "name": device.name, "capabilities": []}
  88. # Try to get device info via v4l2-ctl
  89. v4l2_ctl = shutil.which("v4l2-ctl")
  90. if v4l2_ctl:
  91. import subprocess
  92. try:
  93. result = subprocess.run(
  94. [v4l2_ctl, "-d", device_path, "--info"],
  95. capture_output=True,
  96. text=True,
  97. timeout=5,
  98. )
  99. if result.returncode == 0:
  100. # Parse device name from output
  101. for line in result.stdout.splitlines():
  102. if "Card type" in line:
  103. info["name"] = line.split(":", 1)[1].strip()
  104. elif "Driver name" in line:
  105. info["driver"] = line.split(":", 1)[1].strip()
  106. # Check if device supports video capture
  107. result = subprocess.run(
  108. [v4l2_ctl, "-d", device_path, "--list-formats"],
  109. capture_output=True,
  110. text=True,
  111. timeout=5,
  112. )
  113. if result.returncode == 0 and result.stdout.strip():
  114. info["capabilities"].append("capture")
  115. # Parse available formats
  116. formats = re.findall(r"'(\w+)'", result.stdout)
  117. info["formats"] = list(set(formats))
  118. except (subprocess.TimeoutExpired, Exception) as e:
  119. logger.debug("v4l2-ctl failed for %s: %s", device_path, e)
  120. # Only include devices that look like video capture devices
  121. # Skip metadata devices (typically odd numbered like video1, video3)
  122. try:
  123. device_num = int(device.name.replace("video", ""))
  124. # Even numbered devices are usually capture, odd are metadata
  125. # But also check if we got capabilities
  126. if info.get("capabilities") or device_num % 2 == 0:
  127. cameras.append(info)
  128. except ValueError:
  129. cameras.append(info)
  130. return cameras
  131. def get_ffmpeg_path() -> str | None:
  132. """Get the path to ffmpeg executable."""
  133. # Try shutil.which first
  134. path = shutil.which("ffmpeg")
  135. if path:
  136. return path
  137. # Check common locations (systemd services may have limited PATH)
  138. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  139. if Path(common_path).exists():
  140. return common_path
  141. return None
  142. async def capture_frame(
  143. url: str,
  144. camera_type: str,
  145. timeout: int = 15,
  146. snapshot_url: str | None = None,
  147. ) -> bytes | None:
  148. """Capture single frame from external camera.
  149. Args:
  150. url: Live-stream URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path).
  151. camera_type: "mjpeg", "rtsp", "snapshot", or "usb".
  152. timeout: Connection timeout in seconds.
  153. snapshot_url: Optional override for single-frame capture. When set, fetched
  154. via plain HTTP GET regardless of `camera_type`. Bypasses MJPEG warm-up
  155. handling on sources that expose a dedicated frame endpoint (e.g. go2rtc's
  156. `/api/frame.jpeg` reliably returns a clean image while the MJPEG stream's
  157. first frame is often the encoder's stale keyframe). #1177.
  158. Returns:
  159. JPEG bytes or None on failure
  160. """
  161. if snapshot_url:
  162. logger.debug("capture_frame using snapshot override url=%s...", snapshot_url[:50])
  163. return await _capture_snapshot(snapshot_url, timeout)
  164. logger.debug("capture_frame called: type=%s, url=%s...", camera_type, url[:50] if url else "None")
  165. if camera_type == "mjpeg":
  166. return await _capture_mjpeg_frame(url, timeout)
  167. elif camera_type == "rtsp":
  168. return await _capture_rtsp_frame(url, timeout)
  169. elif camera_type == "snapshot":
  170. return await _capture_snapshot(url, timeout)
  171. elif camera_type == "usb":
  172. return await _capture_usb_frame(url, timeout)
  173. else:
  174. logger.warning("Unknown camera type: %s", camera_type)
  175. return None
  176. async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
  177. """Capture frame from USB camera using ffmpeg."""
  178. ffmpeg = get_ffmpeg_path()
  179. if not ffmpeg:
  180. logger.error("ffmpeg not found - required for USB camera capture")
  181. return None
  182. # Validate device path - must be /dev/videoN format where N is 0-99
  183. # This prevents path traversal by using a strict allowlist approach
  184. import re as regex_module
  185. device_match = regex_module.match(r"^/dev/video(\d{1,2})$", device)
  186. if not device_match:
  187. logger.error("Invalid USB device path format: %s", device)
  188. return None
  189. # Convert to integer to break taint chain - integers cannot contain path traversal
  190. # lgtm[py/path-injection] - device_num is validated integer 0-99
  191. device_num = int(device_match.group(1)) # Safe: regex guarantees 1-2 digits
  192. if device_num > 99:
  193. logger.error("USB device number out of range: %s", device_num)
  194. return None
  195. # Construct safe path from validated integer (completely untainted)
  196. safe_device_path = Path(f"/dev/video{device_num}") # lgtm[py/path-injection]
  197. if not safe_device_path.exists():
  198. logger.error("USB device does not exist: %s", safe_device_path)
  199. return None
  200. # Use the safe path for ffmpeg - this is a hardcoded /dev/videoN path
  201. device = str(safe_device_path) # lgtm[py/path-injection]
  202. # Use ffmpeg to grab a single frame from USB camera
  203. cmd = [
  204. ffmpeg,
  205. "-f",
  206. "v4l2",
  207. "-i",
  208. device,
  209. "-frames:v",
  210. "1",
  211. "-f",
  212. "image2pipe",
  213. "-vcodec",
  214. "mjpeg",
  215. "-q:v",
  216. "2",
  217. "-",
  218. ]
  219. try:
  220. logger.debug("Running USB capture: %s", " ".join(cmd))
  221. process = await asyncio.create_subprocess_exec(
  222. *cmd,
  223. stdout=asyncio.subprocess.PIPE,
  224. stderr=asyncio.subprocess.PIPE,
  225. )
  226. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  227. if process.returncode != 0:
  228. logger.error("ffmpeg USB capture failed: %s", stderr.decode()[:200])
  229. return None
  230. if not stdout or len(stdout) < 100:
  231. logger.error("ffmpeg returned empty or too small frame from USB camera")
  232. return None
  233. return stdout
  234. except TimeoutError:
  235. logger.warning("USB frame capture timed out after %ss", timeout)
  236. if process:
  237. process.kill()
  238. return None
  239. except OSError as e:
  240. logger.error("USB frame capture failed: %s", e)
  241. return None
  242. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  243. """Extract a single representative frame from an MJPEG stream.
  244. Many MJPEG sources — go2rtc most notably (#1177), and several IP cameras —
  245. emit a "warm-up" frame on the byte that follows connection accept: usually
  246. the last keyframe held in the encoder, which is often black or stale until
  247. the encoder catches up to live content. To return a frame that's actually
  248. representative of the scene we read past the first frame and return the
  249. second; if the connection closes / times out / hits the buffer cap before
  250. a second frame ever arrives we fall back to the first so callers still
  251. get *something* (better than degrading slow / single-frame streams to None,
  252. which would regress every code path that consumed pre-fix behaviour).
  253. Note: this function intentionally makes requests to user-configured URLs.
  254. External camera support requires connecting to user-specified camera
  255. endpoints. URL is sanitized and dangerous destinations are blocked.
  256. """
  257. safe_url = _sanitize_camera_url(url, ("http", "https"))
  258. if not safe_url:
  259. logger.error("Invalid MJPEG URL format: %s...", url[:50])
  260. return None
  261. jpeg_start = b"\xff\xd8"
  262. jpeg_end = b"\xff\xd9"
  263. first_frame: bytes | None = None # warm-up frame; fallback if no second arrives
  264. buffer = b""
  265. try:
  266. async with (
  267. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  268. session.get(safe_url) as response,
  269. ):
  270. if response.status != 200:
  271. logger.error("MJPEG stream returned status %s", response.status)
  272. return None
  273. async for chunk in response.content.iter_chunked(8192):
  274. buffer += chunk
  275. # A single chunk can carry multiple frames (e.g. high-FPS sources)
  276. # or a partial frame. Drain every complete frame we already have
  277. # before pulling the next chunk.
  278. while True:
  279. start_idx = buffer.find(jpeg_start)
  280. if start_idx == -1:
  281. # No frame start yet — drop trailing garbage, keep waiting.
  282. break
  283. end_idx = buffer.find(jpeg_end, start_idx + 2)
  284. if end_idx == -1:
  285. # Partial frame; trim already-discarded prefix so the
  286. # buffer stays bounded across long-running streams.
  287. if start_idx > 0:
  288. buffer = buffer[start_idx:]
  289. break
  290. frame = buffer[start_idx : end_idx + 2]
  291. buffer = buffer[end_idx + 2 :]
  292. if first_frame is None:
  293. first_frame = frame # warm-up; keep but don't return yet
  294. continue
  295. return frame # representative second frame
  296. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  297. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  298. break # exit chunk loop, fall through to first_frame fallback
  299. except TimeoutError:
  300. logger.warning("MJPEG frame capture timed out after %ss", timeout)
  301. except (aiohttp.ClientError, OSError) as e:
  302. logger.error("MJPEG frame capture failed: %s", e)
  303. # Stream ended / timed out / buffer cap before a second frame arrived.
  304. # Return whatever warm-up frame we managed to read; better an iffy frame
  305. # than None for callers that need *some* image (snapshot UX, plate-detect
  306. # CV, finish photo). None only if no frame ever arrived at all.
  307. return first_frame
  308. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  309. """Capture frame from RTSP using ffmpeg.
  310. For rtsps:// URLs, a local TLS proxy is used to avoid GnuTLS issues.
  311. """
  312. ffmpeg = get_ffmpeg_path()
  313. if not ffmpeg:
  314. logger.error("ffmpeg not found - required for RTSP capture")
  315. return None
  316. # If rtsps://, use TLS proxy
  317. proxy_server = None
  318. effective_url = url
  319. if url.lower().startswith("rtsps://"):
  320. try:
  321. from urllib.parse import urlparse
  322. from backend.app.services.camera import create_tls_proxy
  323. parsed = urlparse(url)
  324. target_port = parsed.port or 322
  325. proxy_port, proxy_server = await create_tls_proxy(parsed.hostname, target_port)
  326. userinfo = ""
  327. if parsed.username:
  328. userinfo = parsed.username
  329. if parsed.password:
  330. userinfo += f":{parsed.password}"
  331. userinfo += "@"
  332. effective_url = f"rtsp://{userinfo}127.0.0.1:{proxy_port}{parsed.path}"
  333. if parsed.query:
  334. effective_url += f"?{parsed.query}"
  335. except Exception as e:
  336. logger.warning("Failed to create TLS proxy for RTSP capture, falling back: %s", e)
  337. effective_url = url
  338. cmd = [
  339. ffmpeg,
  340. "-rtsp_transport",
  341. "tcp",
  342. "-i",
  343. effective_url,
  344. "-frames:v",
  345. "1",
  346. "-f",
  347. "image2pipe",
  348. "-vcodec",
  349. "mjpeg",
  350. "-q:v",
  351. "2",
  352. "-",
  353. ]
  354. try:
  355. logger.debug("Running ffmpeg RTSP capture...")
  356. process = await asyncio.create_subprocess_exec(
  357. *cmd,
  358. stdout=asyncio.subprocess.PIPE,
  359. stderr=asyncio.subprocess.PIPE,
  360. )
  361. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  362. logger.debug(
  363. "ffmpeg returned: code=%s, stdout=%s bytes, stderr=%s bytes",
  364. process.returncode,
  365. len(stdout),
  366. len(stderr),
  367. )
  368. if process.returncode != 0:
  369. logger.error("ffmpeg RTSP capture failed: %s", stderr.decode()[:200])
  370. return None
  371. if not stdout or len(stdout) < 100:
  372. logger.error("ffmpeg returned empty or too small frame")
  373. return None
  374. return stdout
  375. except TimeoutError:
  376. logger.warning("RTSP frame capture timed out after %ss", timeout)
  377. if process:
  378. process.kill()
  379. return None
  380. except OSError as e:
  381. logger.error("RTSP frame capture failed: %s", e)
  382. return None
  383. finally:
  384. if proxy_server:
  385. proxy_server.close()
  386. await proxy_server.wait_closed()
  387. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  388. """Fetch snapshot from HTTP URL.
  389. Note: This function intentionally makes requests to user-configured URLs.
  390. External camera support requires connecting to user-specified camera endpoints.
  391. URL is sanitized and dangerous destinations are blocked.
  392. """
  393. # Sanitize URL - returns reconstructed URL from validated components
  394. safe_url = _sanitize_camera_url(url, ("http", "https"))
  395. if not safe_url:
  396. logger.error("Invalid snapshot URL format: %s...", url[:50])
  397. return None
  398. try:
  399. async with (
  400. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  401. session.get(safe_url) as response,
  402. ):
  403. if response.status != 200:
  404. logger.error("Snapshot URL returned status %s", response.status)
  405. return None
  406. data = await response.read()
  407. # Validate it looks like JPEG
  408. if not data.startswith(b"\xff\xd8"):
  409. logger.warning("Snapshot does not appear to be JPEG")
  410. # Still return it - might be valid with different header
  411. return data
  412. except TimeoutError:
  413. logger.warning("Snapshot capture timed out after %ss", timeout)
  414. return None
  415. except (aiohttp.ClientError, OSError) as e:
  416. logger.error("Snapshot capture failed: %s", e)
  417. return None
  418. async def test_connection(url: str, camera_type: str) -> dict:
  419. """Test camera connection.
  420. Returns:
  421. Dict with {success: bool, error?: str, resolution?: str}
  422. """
  423. logger.info("Testing camera connection: type=%s, url=%s...", camera_type, url[:50])
  424. try:
  425. frame = await capture_frame(url, camera_type, timeout=10)
  426. logger.info("Capture result: %s bytes", len(frame) if frame else 0)
  427. if frame:
  428. # Try to get resolution from JPEG header
  429. resolution = None
  430. try:
  431. # Simple JPEG dimension extraction
  432. # SOF0 marker is FF C0, followed by length, precision, height, width
  433. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  434. for marker in sof_markers:
  435. idx = frame.find(marker)
  436. if idx != -1 and idx + 9 <= len(frame):
  437. height = (frame[idx + 5] << 8) | frame[idx + 6]
  438. width = (frame[idx + 7] << 8) | frame[idx + 8]
  439. resolution = f"{width}x{height}"
  440. break
  441. except (IndexError, ValueError):
  442. pass # Resolution detection is optional; fall back to default
  443. return {"success": True, "resolution": resolution}
  444. else:
  445. return {"success": False, "error": "Failed to capture frame from camera"}
  446. except Exception as e:
  447. # Sanitize error message - don't expose internal details
  448. error_type = type(e).__name__
  449. logger.error("Camera connection test failed: %s", e)
  450. return {"success": False, "error": f"Connection failed: {error_type}"}
  451. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  452. """Generator yielding MJPEG frames for streaming.
  453. Args:
  454. url: Camera URL or USB device path
  455. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  456. fps: Target frames per second
  457. Yields:
  458. MJPEG frame data with HTTP multipart boundaries
  459. """
  460. frame_interval = 1.0 / max(fps, 1)
  461. last_frame_time = 0.0
  462. if camera_type == "mjpeg":
  463. # Proxy MJPEG stream directly, with reconnect on timeout
  464. max_retries = 3
  465. for attempt in range(max_retries + 1):
  466. frame_yielded = False
  467. async for frame in _stream_mjpeg(url):
  468. frame_yielded = True
  469. current_time = asyncio.get_event_loop().time()
  470. if current_time - last_frame_time >= frame_interval:
  471. last_frame_time = current_time
  472. yield _format_mjpeg_frame(frame)
  473. if not frame_yielded or attempt == max_retries:
  474. break
  475. logger.warning(
  476. "External MJPEG stream ended, reconnecting (attempt %d/%d)...",
  477. attempt + 1,
  478. max_retries,
  479. )
  480. await asyncio.sleep(2)
  481. elif camera_type == "rtsp":
  482. # Use ffmpeg to convert RTSP to MJPEG, with reconnect on timeout
  483. max_retries = 3
  484. for attempt in range(max_retries + 1):
  485. frame_yielded = False
  486. async for frame in _stream_rtsp(url, fps):
  487. frame_yielded = True
  488. yield _format_mjpeg_frame(frame)
  489. if not frame_yielded or attempt == max_retries:
  490. break
  491. logger.warning(
  492. "External RTSP stream ended, reconnecting (attempt %d/%d)...",
  493. attempt + 1,
  494. max_retries,
  495. )
  496. await asyncio.sleep(2)
  497. elif camera_type == "usb":
  498. # Use ffmpeg to stream from USB camera
  499. async for frame in _stream_usb(url, fps):
  500. yield _format_mjpeg_frame(frame)
  501. elif camera_type == "snapshot":
  502. # Poll snapshot URL at interval
  503. while True:
  504. try:
  505. frame = await _capture_snapshot(url, timeout=10)
  506. if frame:
  507. yield _format_mjpeg_frame(frame)
  508. await asyncio.sleep(frame_interval)
  509. except asyncio.CancelledError:
  510. break
  511. except (aiohttp.ClientError, OSError) as e:
  512. logger.warning("Snapshot poll failed: %s", e)
  513. await asyncio.sleep(frame_interval)
  514. def _format_mjpeg_frame(frame: bytes) -> bytes:
  515. """Format frame for MJPEG HTTP response."""
  516. return (
  517. b"--frame\r\n"
  518. b"Content-Type: image/jpeg\r\n"
  519. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  520. b"\r\n" + frame + b"\r\n"
  521. )
  522. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  523. """Stream frames from MJPEG URL.
  524. Note: This function intentionally makes requests to user-configured URLs.
  525. External camera support requires connecting to user-specified camera endpoints.
  526. URL is sanitized and dangerous destinations are blocked.
  527. """
  528. # Sanitize URL - returns reconstructed URL from validated components
  529. safe_url = _sanitize_camera_url(url, ("http", "https"))
  530. if not safe_url:
  531. logger.error("Invalid MJPEG stream URL: %s...", url[:50])
  532. return
  533. try:
  534. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  535. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(safe_url) as response:
  536. if response.status != 200:
  537. logger.error("MJPEG stream returned status %s", response.status)
  538. return
  539. buffer = b""
  540. jpeg_start = b"\xff\xd8"
  541. jpeg_end = b"\xff\xd9"
  542. async for chunk in response.content.iter_chunked(8192):
  543. buffer += chunk
  544. # Extract complete frames from buffer
  545. while True:
  546. start_idx = buffer.find(jpeg_start)
  547. if start_idx == -1:
  548. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  549. break
  550. if start_idx > 0:
  551. buffer = buffer[start_idx:]
  552. end_idx = buffer.find(jpeg_end, 2)
  553. if end_idx == -1:
  554. break
  555. frame = buffer[: end_idx + 2]
  556. buffer = buffer[end_idx + 2 :]
  557. yield frame
  558. except asyncio.CancelledError:
  559. logger.info("MJPEG stream cancelled")
  560. except (aiohttp.ClientError, OSError) as e:
  561. logger.error("MJPEG stream error: %s", e)
  562. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  563. """Stream frames from RTSP URL via ffmpeg.
  564. For rtsps:// URLs, a local TLS proxy (Python OpenSSL) is used instead
  565. of relying on ffmpeg's GnuTLS backend, which has compatibility issues
  566. with some printer firmwares.
  567. """
  568. ffmpeg = get_ffmpeg_path()
  569. if not ffmpeg:
  570. logger.error("ffmpeg not found - required for RTSP streaming")
  571. return
  572. # If the URL uses rtsps://, set up a TLS proxy so ffmpeg uses plain rtsp://
  573. proxy_server = None
  574. effective_url = url
  575. if url.lower().startswith("rtsps://"):
  576. try:
  577. from urllib.parse import urlparse
  578. from backend.app.services.camera import create_tls_proxy
  579. parsed = urlparse(url)
  580. target_port = parsed.port or 322
  581. proxy_port, proxy_server = await create_tls_proxy(parsed.hostname, target_port)
  582. # Rewrite URL: rtsps://user:pass@host:port/path → rtsp://user:pass@127.0.0.1:proxy/path
  583. userinfo = ""
  584. if parsed.username:
  585. userinfo = parsed.username
  586. if parsed.password:
  587. userinfo += f":{parsed.password}"
  588. userinfo += "@"
  589. effective_url = f"rtsp://{userinfo}127.0.0.1:{proxy_port}{parsed.path}"
  590. if parsed.query:
  591. effective_url += f"?{parsed.query}"
  592. except Exception as e:
  593. logger.warning("Failed to create TLS proxy for RTSP, falling back to direct: %s", e)
  594. effective_url = url
  595. cmd = [
  596. ffmpeg,
  597. "-rtsp_transport",
  598. "tcp",
  599. "-rtsp_flags",
  600. "prefer_tcp",
  601. "-timeout",
  602. "30000000",
  603. "-buffer_size",
  604. "1024000",
  605. "-max_delay",
  606. "500000",
  607. "-probesize",
  608. "32",
  609. "-analyzeduration",
  610. "0",
  611. "-fflags",
  612. "nobuffer",
  613. "-flags",
  614. "low_delay",
  615. "-i",
  616. effective_url,
  617. "-f",
  618. "mjpeg",
  619. "-q:v",
  620. "5",
  621. "-r",
  622. str(fps),
  623. "-an",
  624. "-",
  625. ]
  626. process = None
  627. try:
  628. process = await asyncio.create_subprocess_exec(
  629. *cmd,
  630. stdout=asyncio.subprocess.PIPE,
  631. stderr=asyncio.subprocess.PIPE,
  632. )
  633. # Brief check for immediate startup failures
  634. await asyncio.sleep(0.1)
  635. if process.returncode is not None:
  636. stderr = await process.stderr.read()
  637. logger.error("ffmpeg RTSP stream failed immediately: %s", stderr.decode()[:300])
  638. return
  639. buffer = b""
  640. jpeg_start = b"\xff\xd8"
  641. jpeg_end = b"\xff\xd9"
  642. while True:
  643. try:
  644. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  645. if not chunk:
  646. break
  647. buffer += chunk
  648. # Extract complete frames
  649. while True:
  650. start_idx = buffer.find(jpeg_start)
  651. if start_idx == -1:
  652. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  653. break
  654. if start_idx > 0:
  655. buffer = buffer[start_idx:]
  656. end_idx = buffer.find(jpeg_end, 2)
  657. if end_idx == -1:
  658. break
  659. frame = buffer[: end_idx + 2]
  660. buffer = buffer[end_idx + 2 :]
  661. yield frame
  662. except TimeoutError:
  663. logger.warning("RTSP stream read timeout")
  664. break
  665. except asyncio.CancelledError:
  666. logger.info("RTSP stream cancelled")
  667. except OSError as e:
  668. logger.error("RTSP stream error: %s", e)
  669. finally:
  670. if process and process.returncode is None:
  671. process.terminate()
  672. try:
  673. await asyncio.wait_for(process.wait(), timeout=2.0)
  674. except TimeoutError:
  675. process.kill()
  676. await process.wait()
  677. if proxy_server:
  678. proxy_server.close()
  679. await proxy_server.wait_closed()
  680. async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
  681. """Stream frames from USB camera via ffmpeg."""
  682. ffmpeg = get_ffmpeg_path()
  683. if not ffmpeg:
  684. logger.error("ffmpeg not found - required for USB camera streaming")
  685. return
  686. # Validate device path
  687. if not device.startswith("/dev/video"):
  688. logger.error("Invalid USB device path: %s", device)
  689. return
  690. if not Path(device).exists():
  691. logger.error("USB device does not exist: %s", device)
  692. return
  693. # ffmpeg command to stream from USB camera (v4l2)
  694. cmd = [
  695. ffmpeg,
  696. "-f",
  697. "v4l2",
  698. "-framerate",
  699. str(fps),
  700. "-i",
  701. device,
  702. "-f",
  703. "mjpeg",
  704. "-q:v",
  705. "5",
  706. "-r",
  707. str(fps),
  708. "-",
  709. ]
  710. process = None
  711. try:
  712. logger.info("Starting USB camera stream from %s at %s fps", device, fps)
  713. process = await asyncio.create_subprocess_exec(
  714. *cmd,
  715. stdout=asyncio.subprocess.PIPE,
  716. stderr=asyncio.subprocess.PIPE,
  717. )
  718. # Give ffmpeg a moment to start and check for immediate failures
  719. await asyncio.sleep(0.5)
  720. if process.returncode is not None:
  721. stderr = await process.stderr.read()
  722. logger.error("ffmpeg USB stream failed immediately: %s", stderr.decode()[:300])
  723. return
  724. buffer = b""
  725. jpeg_start = b"\xff\xd8"
  726. jpeg_end = b"\xff\xd9"
  727. while True:
  728. try:
  729. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  730. if not chunk:
  731. break
  732. buffer += chunk
  733. # Extract complete frames
  734. while True:
  735. start_idx = buffer.find(jpeg_start)
  736. if start_idx == -1:
  737. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  738. break
  739. if start_idx > 0:
  740. buffer = buffer[start_idx:]
  741. end_idx = buffer.find(jpeg_end, 2)
  742. if end_idx == -1:
  743. break
  744. frame = buffer[: end_idx + 2]
  745. buffer = buffer[end_idx + 2 :]
  746. yield frame
  747. except TimeoutError:
  748. logger.warning("USB stream read timeout")
  749. break
  750. except asyncio.CancelledError:
  751. logger.info("USB stream cancelled")
  752. except OSError as e:
  753. logger.error("USB stream error: %s", e)
  754. finally:
  755. if process and process.returncode is None:
  756. process.terminate()
  757. try:
  758. await asyncio.wait_for(process.wait(), timeout=2.0)
  759. except TimeoutError:
  760. process.kill()
  761. await process.wait()