external_camera.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894
  1. """External camera service.
  2. Supports MJPEG streams, RTSP streams (via ffmpeg), HTTP snapshot URLs, and USB cameras.
  3. Security Note: This service intentionally makes requests to user-configured camera URLs.
  4. This is necessary functionality for external camera integration. URLs are validated
  5. to ensure they are well-formed before use.
  6. """
  7. import asyncio
  8. import logging
  9. import re
  10. import shutil
  11. from collections.abc import AsyncGenerator
  12. from pathlib import Path
  13. from urllib.parse import urlparse
  14. import aiohttp
  15. logger = logging.getLogger(__name__)
  16. def _sanitize_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> str | None:
  17. """Validate and sanitize camera URL, returning a safe reconstructed URL.
  18. This validates that the URL is well-formed, uses an allowed scheme,
  19. does not target cloud metadata services, and returns a reconstructed
  20. URL from validated components.
  21. Note: This intentionally allows user-provided URLs as that is the
  22. purpose of external camera configuration. Local network IPs are
  23. allowed since cameras are typically on the same LAN.
  24. Args:
  25. url: URL to validate and sanitize
  26. allowed_schemes: Tuple of allowed URL schemes
  27. Returns:
  28. Sanitized URL string if valid, None otherwise
  29. """
  30. try:
  31. parsed = urlparse(url)
  32. if not parsed.scheme or not parsed.netloc:
  33. return None
  34. # Validate scheme against allowlist
  35. scheme = parsed.scheme.lower()
  36. if scheme not in allowed_schemes:
  37. return None
  38. # Block cloud metadata service endpoints (SSRF mitigation)
  39. # These are dangerous destinations that should never be accessed
  40. hostname = parsed.hostname or ""
  41. hostname_lower = hostname.lower()
  42. blocked_hosts = (
  43. "169.254.169.254", # AWS/GCP/Azure metadata
  44. "metadata.google.internal", # GCP metadata
  45. "metadata.google",
  46. "localhost", # Block localhost to prevent internal service access
  47. "127.0.0.1",
  48. "::1",
  49. "0.0.0.0", # nosec B104
  50. )
  51. if hostname_lower in blocked_hosts:
  52. logger.warning("Blocked camera URL targeting restricted host: %s", hostname)
  53. return None
  54. # Block link-local addresses (169.254.x.x)
  55. if hostname.startswith("169.254."):
  56. logger.warning("Blocked camera URL targeting link-local address: %s", hostname)
  57. return None
  58. # Reconstruct URL from validated components to break taint chain
  59. # This creates a new string from validated parts
  60. port_str = f":{parsed.port}" if parsed.port else ""
  61. path = parsed.path or ""
  62. query = f"?{parsed.query}" if parsed.query else ""
  63. fragment = f"#{parsed.fragment}" if parsed.fragment else ""
  64. # Build sanitized URL from validated components
  65. sanitized = f"{scheme}://{hostname}{port_str}{path}{query}{fragment}"
  66. return sanitized
  67. except ValueError:
  68. return None
  69. def _validate_camera_url(url: str, allowed_schemes: tuple[str, ...] = ("http", "https", "rtsp")) -> bool:
  70. """Validate camera URL format (legacy wrapper).
  71. Args:
  72. url: URL to validate
  73. allowed_schemes: Tuple of allowed URL schemes
  74. Returns:
  75. True if URL is valid, False otherwise
  76. """
  77. return _sanitize_camera_url(url, allowed_schemes) is not None
  78. def list_usb_cameras() -> list[dict]:
  79. """List available USB cameras (V4L2 devices on Linux).
  80. Returns:
  81. List of dicts with {device: str, name: str, capabilities: list}
  82. """
  83. cameras = []
  84. video_devices = sorted(Path("/dev").glob("video*"))
  85. for device in video_devices:
  86. device_path = str(device)
  87. info = {"device": device_path, "name": device.name, "capabilities": []}
  88. # Try to get device info via v4l2-ctl
  89. v4l2_ctl = shutil.which("v4l2-ctl")
  90. if v4l2_ctl:
  91. import subprocess
  92. try:
  93. result = subprocess.run(
  94. [v4l2_ctl, "-d", device_path, "--info"],
  95. capture_output=True,
  96. text=True,
  97. timeout=5,
  98. )
  99. if result.returncode == 0:
  100. # Parse device name from output
  101. for line in result.stdout.splitlines():
  102. if "Card type" in line:
  103. info["name"] = line.split(":", 1)[1].strip()
  104. elif "Driver name" in line:
  105. info["driver"] = line.split(":", 1)[1].strip()
  106. # Check if device supports video capture
  107. result = subprocess.run(
  108. [v4l2_ctl, "-d", device_path, "--list-formats"],
  109. capture_output=True,
  110. text=True,
  111. timeout=5,
  112. )
  113. if result.returncode == 0 and result.stdout.strip():
  114. info["capabilities"].append("capture")
  115. # Parse available formats
  116. formats = re.findall(r"'(\w+)'", result.stdout)
  117. info["formats"] = list(set(formats))
  118. except (subprocess.TimeoutExpired, Exception) as e:
  119. logger.debug("v4l2-ctl failed for %s: %s", device_path, e)
  120. # Only include devices that look like video capture devices
  121. # Skip metadata devices (typically odd numbered like video1, video3)
  122. try:
  123. device_num = int(device.name.replace("video", ""))
  124. # Even numbered devices are usually capture, odd are metadata
  125. # But also check if we got capabilities
  126. if info.get("capabilities") or device_num % 2 == 0:
  127. cameras.append(info)
  128. except ValueError:
  129. cameras.append(info)
  130. return cameras
  131. def get_ffmpeg_path() -> str | None:
  132. """Get the path to ffmpeg executable."""
  133. # Try shutil.which first
  134. path = shutil.which("ffmpeg")
  135. if path:
  136. return path
  137. # Check common locations (systemd services may have limited PATH)
  138. for common_path in ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg"]:
  139. if Path(common_path).exists():
  140. return common_path
  141. return None
  142. async def capture_frame(url: str, camera_type: str, timeout: int = 15) -> bytes | None:
  143. """Capture single frame from external camera.
  144. Args:
  145. url: Camera URL (MJPEG stream, RTSP URL, HTTP snapshot URL, or USB device path)
  146. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  147. timeout: Connection timeout in seconds
  148. Returns:
  149. JPEG bytes or None on failure
  150. """
  151. logger.debug("capture_frame called: type=%s, url=%s...", camera_type, url[:50] if url else "None")
  152. if camera_type == "mjpeg":
  153. return await _capture_mjpeg_frame(url, timeout)
  154. elif camera_type == "rtsp":
  155. return await _capture_rtsp_frame(url, timeout)
  156. elif camera_type == "snapshot":
  157. return await _capture_snapshot(url, timeout)
  158. elif camera_type == "usb":
  159. return await _capture_usb_frame(url, timeout)
  160. else:
  161. logger.warning("Unknown camera type: %s", camera_type)
  162. return None
  163. async def _capture_usb_frame(device: str, timeout: int) -> bytes | None:
  164. """Capture frame from USB camera using ffmpeg."""
  165. ffmpeg = get_ffmpeg_path()
  166. if not ffmpeg:
  167. logger.error("ffmpeg not found - required for USB camera capture")
  168. return None
  169. # Validate device path - must be /dev/videoN format where N is 0-99
  170. # This prevents path traversal by using a strict allowlist approach
  171. import re as regex_module
  172. device_match = regex_module.match(r"^/dev/video(\d{1,2})$", device)
  173. if not device_match:
  174. logger.error("Invalid USB device path format: %s", device)
  175. return None
  176. # Convert to integer to break taint chain - integers cannot contain path traversal
  177. # lgtm[py/path-injection] - device_num is validated integer 0-99
  178. device_num = int(device_match.group(1)) # Safe: regex guarantees 1-2 digits
  179. if device_num > 99:
  180. logger.error("USB device number out of range: %s", device_num)
  181. return None
  182. # Construct safe path from validated integer (completely untainted)
  183. safe_device_path = Path(f"/dev/video{device_num}") # lgtm[py/path-injection]
  184. if not safe_device_path.exists():
  185. logger.error("USB device does not exist: %s", safe_device_path)
  186. return None
  187. # Use the safe path for ffmpeg - this is a hardcoded /dev/videoN path
  188. device = str(safe_device_path) # lgtm[py/path-injection]
  189. # Use ffmpeg to grab a single frame from USB camera
  190. cmd = [
  191. ffmpeg,
  192. "-f",
  193. "v4l2",
  194. "-i",
  195. device,
  196. "-frames:v",
  197. "1",
  198. "-f",
  199. "image2pipe",
  200. "-vcodec",
  201. "mjpeg",
  202. "-q:v",
  203. "2",
  204. "-",
  205. ]
  206. try:
  207. logger.debug("Running USB capture: %s", " ".join(cmd))
  208. process = await asyncio.create_subprocess_exec(
  209. *cmd,
  210. stdout=asyncio.subprocess.PIPE,
  211. stderr=asyncio.subprocess.PIPE,
  212. )
  213. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  214. if process.returncode != 0:
  215. logger.error("ffmpeg USB capture failed: %s", stderr.decode()[:200])
  216. return None
  217. if not stdout or len(stdout) < 100:
  218. logger.error("ffmpeg returned empty or too small frame from USB camera")
  219. return None
  220. return stdout
  221. except TimeoutError:
  222. logger.warning("USB frame capture timed out after %ss", timeout)
  223. if process:
  224. process.kill()
  225. return None
  226. except OSError as e:
  227. logger.error("USB frame capture failed: %s", e)
  228. return None
  229. async def _capture_mjpeg_frame(url: str, timeout: int) -> bytes | None:
  230. """Extract a single representative frame from an MJPEG stream.
  231. Many MJPEG sources — go2rtc most notably (#1177), and several IP cameras —
  232. emit a "warm-up" frame on the byte that follows connection accept: usually
  233. the last keyframe held in the encoder, which is often black or stale until
  234. the encoder catches up to live content. To return a frame that's actually
  235. representative of the scene we read past the first frame and return the
  236. second; if the connection closes / times out / hits the buffer cap before
  237. a second frame ever arrives we fall back to the first so callers still
  238. get *something* (better than degrading slow / single-frame streams to None,
  239. which would regress every code path that consumed pre-fix behaviour).
  240. Note: this function intentionally makes requests to user-configured URLs.
  241. External camera support requires connecting to user-specified camera
  242. endpoints. URL is sanitized and dangerous destinations are blocked.
  243. """
  244. safe_url = _sanitize_camera_url(url, ("http", "https"))
  245. if not safe_url:
  246. logger.error("Invalid MJPEG URL format: %s...", url[:50])
  247. return None
  248. jpeg_start = b"\xff\xd8"
  249. jpeg_end = b"\xff\xd9"
  250. first_frame: bytes | None = None # warm-up frame; fallback if no second arrives
  251. buffer = b""
  252. try:
  253. async with (
  254. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  255. session.get(safe_url) as response,
  256. ):
  257. if response.status != 200:
  258. logger.error("MJPEG stream returned status %s", response.status)
  259. return None
  260. async for chunk in response.content.iter_chunked(8192):
  261. buffer += chunk
  262. # A single chunk can carry multiple frames (e.g. high-FPS sources)
  263. # or a partial frame. Drain every complete frame we already have
  264. # before pulling the next chunk.
  265. while True:
  266. start_idx = buffer.find(jpeg_start)
  267. if start_idx == -1:
  268. # No frame start yet — drop trailing garbage, keep waiting.
  269. break
  270. end_idx = buffer.find(jpeg_end, start_idx + 2)
  271. if end_idx == -1:
  272. # Partial frame; trim already-discarded prefix so the
  273. # buffer stays bounded across long-running streams.
  274. if start_idx > 0:
  275. buffer = buffer[start_idx:]
  276. break
  277. frame = buffer[start_idx : end_idx + 2]
  278. buffer = buffer[end_idx + 2 :]
  279. if first_frame is None:
  280. first_frame = frame # warm-up; keep but don't return yet
  281. continue
  282. return frame # representative second frame
  283. if len(buffer) > 5 * 1024 * 1024: # 5MB limit
  284. logger.warning("MJPEG buffer exceeded 5MB without finding frame")
  285. break # exit chunk loop, fall through to first_frame fallback
  286. except TimeoutError:
  287. logger.warning("MJPEG frame capture timed out after %ss", timeout)
  288. except (aiohttp.ClientError, OSError) as e:
  289. logger.error("MJPEG frame capture failed: %s", e)
  290. # Stream ended / timed out / buffer cap before a second frame arrived.
  291. # Return whatever warm-up frame we managed to read; better an iffy frame
  292. # than None for callers that need *some* image (snapshot UX, plate-detect
  293. # CV, finish photo). None only if no frame ever arrived at all.
  294. return first_frame
  295. async def _capture_rtsp_frame(url: str, timeout: int) -> bytes | None:
  296. """Capture frame from RTSP using ffmpeg.
  297. For rtsps:// URLs, a local TLS proxy is used to avoid GnuTLS issues.
  298. """
  299. ffmpeg = get_ffmpeg_path()
  300. if not ffmpeg:
  301. logger.error("ffmpeg not found - required for RTSP capture")
  302. return None
  303. # If rtsps://, use TLS proxy
  304. proxy_server = None
  305. effective_url = url
  306. if url.lower().startswith("rtsps://"):
  307. try:
  308. from urllib.parse import urlparse
  309. from backend.app.services.camera import create_tls_proxy
  310. parsed = urlparse(url)
  311. target_port = parsed.port or 322
  312. proxy_port, proxy_server = await create_tls_proxy(parsed.hostname, target_port)
  313. userinfo = ""
  314. if parsed.username:
  315. userinfo = parsed.username
  316. if parsed.password:
  317. userinfo += f":{parsed.password}"
  318. userinfo += "@"
  319. effective_url = f"rtsp://{userinfo}127.0.0.1:{proxy_port}{parsed.path}"
  320. if parsed.query:
  321. effective_url += f"?{parsed.query}"
  322. except Exception as e:
  323. logger.warning("Failed to create TLS proxy for RTSP capture, falling back: %s", e)
  324. effective_url = url
  325. cmd = [
  326. ffmpeg,
  327. "-rtsp_transport",
  328. "tcp",
  329. "-i",
  330. effective_url,
  331. "-frames:v",
  332. "1",
  333. "-f",
  334. "image2pipe",
  335. "-vcodec",
  336. "mjpeg",
  337. "-q:v",
  338. "2",
  339. "-",
  340. ]
  341. try:
  342. logger.debug("Running ffmpeg RTSP capture...")
  343. process = await asyncio.create_subprocess_exec(
  344. *cmd,
  345. stdout=asyncio.subprocess.PIPE,
  346. stderr=asyncio.subprocess.PIPE,
  347. )
  348. stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
  349. logger.debug(
  350. "ffmpeg returned: code=%s, stdout=%s bytes, stderr=%s bytes",
  351. process.returncode,
  352. len(stdout),
  353. len(stderr),
  354. )
  355. if process.returncode != 0:
  356. logger.error("ffmpeg RTSP capture failed: %s", stderr.decode()[:200])
  357. return None
  358. if not stdout or len(stdout) < 100:
  359. logger.error("ffmpeg returned empty or too small frame")
  360. return None
  361. return stdout
  362. except TimeoutError:
  363. logger.warning("RTSP frame capture timed out after %ss", timeout)
  364. if process:
  365. process.kill()
  366. return None
  367. except OSError as e:
  368. logger.error("RTSP frame capture failed: %s", e)
  369. return None
  370. finally:
  371. if proxy_server:
  372. proxy_server.close()
  373. await proxy_server.wait_closed()
  374. async def _capture_snapshot(url: str, timeout: int) -> bytes | None:
  375. """Fetch snapshot from HTTP URL.
  376. Note: This function intentionally makes requests to user-configured URLs.
  377. External camera support requires connecting to user-specified camera endpoints.
  378. URL is sanitized and dangerous destinations are blocked.
  379. """
  380. # Sanitize URL - returns reconstructed URL from validated components
  381. safe_url = _sanitize_camera_url(url, ("http", "https"))
  382. if not safe_url:
  383. logger.error("Invalid snapshot URL format: %s...", url[:50])
  384. return None
  385. try:
  386. async with (
  387. aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session,
  388. session.get(safe_url) as response,
  389. ):
  390. if response.status != 200:
  391. logger.error("Snapshot URL returned status %s", response.status)
  392. return None
  393. data = await response.read()
  394. # Validate it looks like JPEG
  395. if not data.startswith(b"\xff\xd8"):
  396. logger.warning("Snapshot does not appear to be JPEG")
  397. # Still return it - might be valid with different header
  398. return data
  399. except TimeoutError:
  400. logger.warning("Snapshot capture timed out after %ss", timeout)
  401. return None
  402. except (aiohttp.ClientError, OSError) as e:
  403. logger.error("Snapshot capture failed: %s", e)
  404. return None
  405. async def test_connection(url: str, camera_type: str) -> dict:
  406. """Test camera connection.
  407. Returns:
  408. Dict with {success: bool, error?: str, resolution?: str}
  409. """
  410. logger.info("Testing camera connection: type=%s, url=%s...", camera_type, url[:50])
  411. try:
  412. frame = await capture_frame(url, camera_type, timeout=10)
  413. logger.info("Capture result: %s bytes", len(frame) if frame else 0)
  414. if frame:
  415. # Try to get resolution from JPEG header
  416. resolution = None
  417. try:
  418. # Simple JPEG dimension extraction
  419. # SOF0 marker is FF C0, followed by length, precision, height, width
  420. sof_markers = [b"\xff\xc0", b"\xff\xc1", b"\xff\xc2"]
  421. for marker in sof_markers:
  422. idx = frame.find(marker)
  423. if idx != -1 and idx + 9 <= len(frame):
  424. height = (frame[idx + 5] << 8) | frame[idx + 6]
  425. width = (frame[idx + 7] << 8) | frame[idx + 8]
  426. resolution = f"{width}x{height}"
  427. break
  428. except (IndexError, ValueError):
  429. pass # Resolution detection is optional; fall back to default
  430. return {"success": True, "resolution": resolution}
  431. else:
  432. return {"success": False, "error": "Failed to capture frame from camera"}
  433. except Exception as e:
  434. # Sanitize error message - don't expose internal details
  435. error_type = type(e).__name__
  436. logger.error("Camera connection test failed: %s", e)
  437. return {"success": False, "error": f"Connection failed: {error_type}"}
  438. async def generate_mjpeg_stream(url: str, camera_type: str, fps: int = 10) -> AsyncGenerator[bytes, None]:
  439. """Generator yielding MJPEG frames for streaming.
  440. Args:
  441. url: Camera URL or USB device path
  442. camera_type: "mjpeg", "rtsp", "snapshot", or "usb"
  443. fps: Target frames per second
  444. Yields:
  445. MJPEG frame data with HTTP multipart boundaries
  446. """
  447. frame_interval = 1.0 / max(fps, 1)
  448. last_frame_time = 0.0
  449. if camera_type == "mjpeg":
  450. # Proxy MJPEG stream directly, with reconnect on timeout
  451. max_retries = 3
  452. for attempt in range(max_retries + 1):
  453. frame_yielded = False
  454. async for frame in _stream_mjpeg(url):
  455. frame_yielded = True
  456. current_time = asyncio.get_event_loop().time()
  457. if current_time - last_frame_time >= frame_interval:
  458. last_frame_time = current_time
  459. yield _format_mjpeg_frame(frame)
  460. if not frame_yielded or attempt == max_retries:
  461. break
  462. logger.warning(
  463. "External MJPEG stream ended, reconnecting (attempt %d/%d)...",
  464. attempt + 1,
  465. max_retries,
  466. )
  467. await asyncio.sleep(2)
  468. elif camera_type == "rtsp":
  469. # Use ffmpeg to convert RTSP to MJPEG, with reconnect on timeout
  470. max_retries = 3
  471. for attempt in range(max_retries + 1):
  472. frame_yielded = False
  473. async for frame in _stream_rtsp(url, fps):
  474. frame_yielded = True
  475. yield _format_mjpeg_frame(frame)
  476. if not frame_yielded or attempt == max_retries:
  477. break
  478. logger.warning(
  479. "External RTSP stream ended, reconnecting (attempt %d/%d)...",
  480. attempt + 1,
  481. max_retries,
  482. )
  483. await asyncio.sleep(2)
  484. elif camera_type == "usb":
  485. # Use ffmpeg to stream from USB camera
  486. async for frame in _stream_usb(url, fps):
  487. yield _format_mjpeg_frame(frame)
  488. elif camera_type == "snapshot":
  489. # Poll snapshot URL at interval
  490. while True:
  491. try:
  492. frame = await _capture_snapshot(url, timeout=10)
  493. if frame:
  494. yield _format_mjpeg_frame(frame)
  495. await asyncio.sleep(frame_interval)
  496. except asyncio.CancelledError:
  497. break
  498. except (aiohttp.ClientError, OSError) as e:
  499. logger.warning("Snapshot poll failed: %s", e)
  500. await asyncio.sleep(frame_interval)
  501. def _format_mjpeg_frame(frame: bytes) -> bytes:
  502. """Format frame for MJPEG HTTP response."""
  503. return (
  504. b"--frame\r\n"
  505. b"Content-Type: image/jpeg\r\n"
  506. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  507. b"\r\n" + frame + b"\r\n"
  508. )
  509. async def _stream_mjpeg(url: str) -> AsyncGenerator[bytes, None]:
  510. """Stream frames from MJPEG URL.
  511. Note: This function intentionally makes requests to user-configured URLs.
  512. External camera support requires connecting to user-specified camera endpoints.
  513. URL is sanitized and dangerous destinations are blocked.
  514. """
  515. # Sanitize URL - returns reconstructed URL from validated components
  516. safe_url = _sanitize_camera_url(url, ("http", "https"))
  517. if not safe_url:
  518. logger.error("Invalid MJPEG stream URL: %s...", url[:50])
  519. return
  520. try:
  521. timeout = aiohttp.ClientTimeout(total=None, sock_read=30)
  522. async with aiohttp.ClientSession(timeout=timeout) as session, session.get(safe_url) as response:
  523. if response.status != 200:
  524. logger.error("MJPEG stream returned status %s", response.status)
  525. return
  526. buffer = b""
  527. jpeg_start = b"\xff\xd8"
  528. jpeg_end = b"\xff\xd9"
  529. async for chunk in response.content.iter_chunked(8192):
  530. buffer += chunk
  531. # Extract complete frames from buffer
  532. while True:
  533. start_idx = buffer.find(jpeg_start)
  534. if start_idx == -1:
  535. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  536. break
  537. if start_idx > 0:
  538. buffer = buffer[start_idx:]
  539. end_idx = buffer.find(jpeg_end, 2)
  540. if end_idx == -1:
  541. break
  542. frame = buffer[: end_idx + 2]
  543. buffer = buffer[end_idx + 2 :]
  544. yield frame
  545. except asyncio.CancelledError:
  546. logger.info("MJPEG stream cancelled")
  547. except (aiohttp.ClientError, OSError) as e:
  548. logger.error("MJPEG stream error: %s", e)
  549. async def _stream_rtsp(url: str, fps: int) -> AsyncGenerator[bytes, None]:
  550. """Stream frames from RTSP URL via ffmpeg.
  551. For rtsps:// URLs, a local TLS proxy (Python OpenSSL) is used instead
  552. of relying on ffmpeg's GnuTLS backend, which has compatibility issues
  553. with some printer firmwares.
  554. """
  555. ffmpeg = get_ffmpeg_path()
  556. if not ffmpeg:
  557. logger.error("ffmpeg not found - required for RTSP streaming")
  558. return
  559. # If the URL uses rtsps://, set up a TLS proxy so ffmpeg uses plain rtsp://
  560. proxy_server = None
  561. effective_url = url
  562. if url.lower().startswith("rtsps://"):
  563. try:
  564. from urllib.parse import urlparse
  565. from backend.app.services.camera import create_tls_proxy
  566. parsed = urlparse(url)
  567. target_port = parsed.port or 322
  568. proxy_port, proxy_server = await create_tls_proxy(parsed.hostname, target_port)
  569. # Rewrite URL: rtsps://user:pass@host:port/path → rtsp://user:pass@127.0.0.1:proxy/path
  570. userinfo = ""
  571. if parsed.username:
  572. userinfo = parsed.username
  573. if parsed.password:
  574. userinfo += f":{parsed.password}"
  575. userinfo += "@"
  576. effective_url = f"rtsp://{userinfo}127.0.0.1:{proxy_port}{parsed.path}"
  577. if parsed.query:
  578. effective_url += f"?{parsed.query}"
  579. except Exception as e:
  580. logger.warning("Failed to create TLS proxy for RTSP, falling back to direct: %s", e)
  581. effective_url = url
  582. cmd = [
  583. ffmpeg,
  584. "-rtsp_transport",
  585. "tcp",
  586. "-rtsp_flags",
  587. "prefer_tcp",
  588. "-timeout",
  589. "30000000",
  590. "-buffer_size",
  591. "1024000",
  592. "-max_delay",
  593. "500000",
  594. "-probesize",
  595. "32",
  596. "-analyzeduration",
  597. "0",
  598. "-fflags",
  599. "nobuffer",
  600. "-flags",
  601. "low_delay",
  602. "-i",
  603. effective_url,
  604. "-f",
  605. "mjpeg",
  606. "-q:v",
  607. "5",
  608. "-r",
  609. str(fps),
  610. "-an",
  611. "-",
  612. ]
  613. process = None
  614. try:
  615. process = await asyncio.create_subprocess_exec(
  616. *cmd,
  617. stdout=asyncio.subprocess.PIPE,
  618. stderr=asyncio.subprocess.PIPE,
  619. )
  620. # Brief check for immediate startup failures
  621. await asyncio.sleep(0.1)
  622. if process.returncode is not None:
  623. stderr = await process.stderr.read()
  624. logger.error("ffmpeg RTSP stream failed immediately: %s", stderr.decode()[:300])
  625. return
  626. buffer = b""
  627. jpeg_start = b"\xff\xd8"
  628. jpeg_end = b"\xff\xd9"
  629. while True:
  630. try:
  631. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  632. if not chunk:
  633. break
  634. buffer += chunk
  635. # Extract complete frames
  636. while True:
  637. start_idx = buffer.find(jpeg_start)
  638. if start_idx == -1:
  639. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  640. break
  641. if start_idx > 0:
  642. buffer = buffer[start_idx:]
  643. end_idx = buffer.find(jpeg_end, 2)
  644. if end_idx == -1:
  645. break
  646. frame = buffer[: end_idx + 2]
  647. buffer = buffer[end_idx + 2 :]
  648. yield frame
  649. except TimeoutError:
  650. logger.warning("RTSP stream read timeout")
  651. break
  652. except asyncio.CancelledError:
  653. logger.info("RTSP stream cancelled")
  654. except OSError as e:
  655. logger.error("RTSP stream error: %s", e)
  656. finally:
  657. if process and process.returncode is None:
  658. process.terminate()
  659. try:
  660. await asyncio.wait_for(process.wait(), timeout=2.0)
  661. except TimeoutError:
  662. process.kill()
  663. await process.wait()
  664. if proxy_server:
  665. proxy_server.close()
  666. await proxy_server.wait_closed()
  667. async def _stream_usb(device: str, fps: int) -> AsyncGenerator[bytes, None]:
  668. """Stream frames from USB camera via ffmpeg."""
  669. ffmpeg = get_ffmpeg_path()
  670. if not ffmpeg:
  671. logger.error("ffmpeg not found - required for USB camera streaming")
  672. return
  673. # Validate device path
  674. if not device.startswith("/dev/video"):
  675. logger.error("Invalid USB device path: %s", device)
  676. return
  677. if not Path(device).exists():
  678. logger.error("USB device does not exist: %s", device)
  679. return
  680. # ffmpeg command to stream from USB camera (v4l2)
  681. cmd = [
  682. ffmpeg,
  683. "-f",
  684. "v4l2",
  685. "-framerate",
  686. str(fps),
  687. "-i",
  688. device,
  689. "-f",
  690. "mjpeg",
  691. "-q:v",
  692. "5",
  693. "-r",
  694. str(fps),
  695. "-",
  696. ]
  697. process = None
  698. try:
  699. logger.info("Starting USB camera stream from %s at %s fps", device, fps)
  700. process = await asyncio.create_subprocess_exec(
  701. *cmd,
  702. stdout=asyncio.subprocess.PIPE,
  703. stderr=asyncio.subprocess.PIPE,
  704. )
  705. # Give ffmpeg a moment to start and check for immediate failures
  706. await asyncio.sleep(0.5)
  707. if process.returncode is not None:
  708. stderr = await process.stderr.read()
  709. logger.error("ffmpeg USB stream failed immediately: %s", stderr.decode()[:300])
  710. return
  711. buffer = b""
  712. jpeg_start = b"\xff\xd8"
  713. jpeg_end = b"\xff\xd9"
  714. while True:
  715. try:
  716. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  717. if not chunk:
  718. break
  719. buffer += chunk
  720. # Extract complete frames
  721. while True:
  722. start_idx = buffer.find(jpeg_start)
  723. if start_idx == -1:
  724. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  725. break
  726. if start_idx > 0:
  727. buffer = buffer[start_idx:]
  728. end_idx = buffer.find(jpeg_end, 2)
  729. if end_idx == -1:
  730. break
  731. frame = buffer[: end_idx + 2]
  732. buffer = buffer[end_idx + 2 :]
  733. yield frame
  734. except TimeoutError:
  735. logger.warning("USB stream read timeout")
  736. break
  737. except asyncio.CancelledError:
  738. logger.info("USB stream cancelled")
  739. except OSError as e:
  740. logger.error("USB stream error: %s", e)
  741. finally:
  742. if process and process.returncode is None:
  743. process.terminate()
  744. try:
  745. await asyncio.wait_for(process.wait(), timeout=2.0)
  746. except TimeoutError:
  747. process.kill()
  748. await process.wait()