camera.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. """Camera streaming API endpoints for Bambu Lab printers."""
  2. import asyncio
  3. import logging
  4. import weakref
  5. from typing import AsyncGenerator
  6. from fastapi import APIRouter, HTTPException, Depends, Request
  7. from fastapi.responses import StreamingResponse, Response
  8. from sqlalchemy.ext.asyncio import AsyncSession
  9. from sqlalchemy import select
  10. from backend.app.core.database import get_db
  11. from backend.app.models.printer import Printer
  12. from backend.app.services.camera import (
  13. build_camera_url,
  14. capture_camera_frame,
  15. test_camera_connection,
  16. get_ffmpeg_path,
  17. get_camera_port,
  18. )
  19. from backend.app.services.printer_manager import printer_manager
  20. logger = logging.getLogger(__name__)
  21. router = APIRouter(prefix="/printers", tags=["camera"])
  22. # Track active ffmpeg processes for cleanup
  23. _active_streams: dict[str, asyncio.subprocess.Process] = {}
  24. async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
  25. """Get printer by ID or raise 404."""
  26. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  27. printer = result.scalar_one_or_none()
  28. if not printer:
  29. raise HTTPException(status_code=404, detail="Printer not found")
  30. return printer
  31. async def generate_mjpeg_stream(
  32. ip_address: str,
  33. access_code: str,
  34. model: str | None,
  35. fps: int = 10,
  36. stream_id: str | None = None,
  37. disconnect_event: asyncio.Event | None = None,
  38. ) -> AsyncGenerator[bytes, None]:
  39. """Generate MJPEG stream from printer camera using ffmpeg.
  40. This captures frames continuously and yields them in MJPEG format.
  41. """
  42. ffmpeg = get_ffmpeg_path()
  43. if not ffmpeg:
  44. logger.error("ffmpeg not found - camera streaming requires ffmpeg")
  45. yield (
  46. b"--frame\r\n"
  47. b"Content-Type: text/plain\r\n\r\n"
  48. b"Error: ffmpeg not installed\r\n"
  49. )
  50. return
  51. port = get_camera_port(model)
  52. camera_url = f"rtsps://bblp:{access_code}@{ip_address}:{port}/streaming/live/1"
  53. # ffmpeg command to output MJPEG stream to stdout
  54. # -rtsp_transport tcp: Use TCP for reliability
  55. # -rtsp_flags prefer_tcp: Prefer TCP for RTSP
  56. # -f mjpeg: Output as MJPEG
  57. # -q:v 5: Quality (lower = better, 2-10 is good range)
  58. # -r: Output framerate
  59. cmd = [
  60. ffmpeg,
  61. "-rtsp_transport", "tcp",
  62. "-rtsp_flags", "prefer_tcp",
  63. "-i", camera_url,
  64. "-f", "mjpeg",
  65. "-q:v", "5",
  66. "-r", str(fps),
  67. "-an", # No audio
  68. "-" # Output to stdout
  69. ]
  70. logger.info(f"Starting camera stream for {ip_address} (stream_id={stream_id})")
  71. logger.debug(f"ffmpeg command: {ffmpeg} ... (url hidden)")
  72. process = None
  73. try:
  74. process = await asyncio.create_subprocess_exec(
  75. *cmd,
  76. stdout=asyncio.subprocess.PIPE,
  77. stderr=asyncio.subprocess.PIPE,
  78. )
  79. # Track active process for cleanup
  80. if stream_id:
  81. _active_streams[stream_id] = process
  82. # Give ffmpeg a moment to start and check for immediate failures
  83. await asyncio.sleep(0.5)
  84. if process.returncode is not None:
  85. stderr = await process.stderr.read()
  86. logger.error(f"ffmpeg failed immediately: {stderr.decode()}")
  87. yield (
  88. b"--frame\r\n"
  89. b"Content-Type: text/plain\r\n\r\n"
  90. b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
  91. )
  92. return
  93. # Read JPEG frames from ffmpeg output
  94. # JPEG images start with 0xFFD8 and end with 0xFFD9
  95. buffer = b""
  96. jpeg_start = b"\xff\xd8"
  97. jpeg_end = b"\xff\xd9"
  98. while True:
  99. # Check if client disconnected
  100. if disconnect_event and disconnect_event.is_set():
  101. logger.info(f"Client disconnected, stopping stream {stream_id}")
  102. break
  103. try:
  104. # Read chunk from ffmpeg
  105. chunk = await asyncio.wait_for(
  106. process.stdout.read(8192),
  107. timeout=10.0
  108. )
  109. if not chunk:
  110. logger.warning("Camera stream ended (no more data)")
  111. break
  112. buffer += chunk
  113. # Find complete JPEG frames in buffer
  114. while True:
  115. start_idx = buffer.find(jpeg_start)
  116. if start_idx == -1:
  117. # No start marker, clear buffer up to last 2 bytes
  118. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  119. break
  120. # Trim anything before the start marker
  121. if start_idx > 0:
  122. buffer = buffer[start_idx:]
  123. end_idx = buffer.find(jpeg_end, 2) # Skip first 2 bytes
  124. if end_idx == -1:
  125. # No end marker yet, wait for more data
  126. break
  127. # Extract complete frame
  128. frame = buffer[:end_idx + 2]
  129. buffer = buffer[end_idx + 2:]
  130. # Yield frame in MJPEG format
  131. yield (
  132. b"--frame\r\n"
  133. b"Content-Type: image/jpeg\r\n"
  134. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  135. b"\r\n" + frame + b"\r\n"
  136. )
  137. except asyncio.TimeoutError:
  138. logger.warning("Camera stream read timeout")
  139. break
  140. except asyncio.CancelledError:
  141. logger.info(f"Camera stream cancelled (stream_id={stream_id})")
  142. break
  143. except GeneratorExit:
  144. logger.info(f"Camera stream generator exit (stream_id={stream_id})")
  145. break
  146. except FileNotFoundError:
  147. logger.error("ffmpeg not found - camera streaming requires ffmpeg")
  148. yield (
  149. b"--frame\r\n"
  150. b"Content-Type: text/plain\r\n\r\n"
  151. b"Error: ffmpeg not installed\r\n"
  152. )
  153. except asyncio.CancelledError:
  154. logger.info(f"Camera stream task cancelled (stream_id={stream_id})")
  155. except GeneratorExit:
  156. logger.info(f"Camera stream generator closed (stream_id={stream_id})")
  157. except Exception as e:
  158. logger.exception(f"Camera stream error: {e}")
  159. finally:
  160. # Remove from active streams
  161. if stream_id and stream_id in _active_streams:
  162. del _active_streams[stream_id]
  163. if process and process.returncode is None:
  164. logger.info(f"Terminating ffmpeg process for stream {stream_id}")
  165. try:
  166. process.terminate()
  167. try:
  168. await asyncio.wait_for(process.wait(), timeout=2.0)
  169. except asyncio.TimeoutError:
  170. logger.warning(f"ffmpeg didn't terminate gracefully, killing (stream_id={stream_id})")
  171. process.kill()
  172. await process.wait()
  173. except ProcessLookupError:
  174. pass # Process already dead
  175. except Exception as e:
  176. logger.warning(f"Error terminating ffmpeg: {e}")
  177. logger.info(f"Camera stream stopped for {ip_address} (stream_id={stream_id})")
  178. @router.get("/{printer_id}/camera/stream")
  179. async def camera_stream(
  180. printer_id: int,
  181. request: Request,
  182. fps: int = 10,
  183. db: AsyncSession = Depends(get_db),
  184. ):
  185. """Stream live video from printer camera as MJPEG.
  186. This endpoint returns a multipart MJPEG stream that can be used directly
  187. in an <img> tag or video player.
  188. Args:
  189. printer_id: Printer ID
  190. fps: Target frames per second (default: 10, max: 30)
  191. """
  192. import uuid
  193. printer = await get_printer_or_404(printer_id, db)
  194. # Validate FPS
  195. fps = min(max(fps, 1), 30)
  196. # Generate unique stream ID for tracking
  197. stream_id = f"{printer_id}-{uuid.uuid4().hex[:8]}"
  198. # Create disconnect event that will be set when client disconnects
  199. disconnect_event = asyncio.Event()
  200. async def stream_with_disconnect_check():
  201. """Wrapper generator that monitors for client disconnect."""
  202. try:
  203. async for chunk in generate_mjpeg_stream(
  204. ip_address=printer.ip_address,
  205. access_code=printer.access_code,
  206. model=printer.model,
  207. fps=fps,
  208. stream_id=stream_id,
  209. disconnect_event=disconnect_event,
  210. ):
  211. # Check if client is still connected
  212. if await request.is_disconnected():
  213. logger.info(f"Client disconnected detected for stream {stream_id}")
  214. disconnect_event.set()
  215. break
  216. yield chunk
  217. except asyncio.CancelledError:
  218. logger.info(f"Stream {stream_id} cancelled")
  219. disconnect_event.set()
  220. except GeneratorExit:
  221. logger.info(f"Stream {stream_id} generator closed")
  222. disconnect_event.set()
  223. finally:
  224. disconnect_event.set()
  225. # Give a moment for the inner generator to clean up
  226. await asyncio.sleep(0.1)
  227. return StreamingResponse(
  228. stream_with_disconnect_check(),
  229. media_type="multipart/x-mixed-replace; boundary=frame",
  230. headers={
  231. "Cache-Control": "no-cache, no-store, must-revalidate",
  232. "Pragma": "no-cache",
  233. "Expires": "0",
  234. }
  235. )
  236. @router.api_route("/{printer_id}/camera/stop", methods=["GET", "POST"])
  237. async def stop_camera_stream(printer_id: int):
  238. """Stop all active camera streams for a printer.
  239. This can be called by the frontend when the camera window is closed.
  240. Accepts both GET and POST (POST for sendBeacon compatibility).
  241. """
  242. stopped = 0
  243. to_remove = []
  244. for stream_id, process in list(_active_streams.items()):
  245. if stream_id.startswith(f"{printer_id}-"):
  246. to_remove.append(stream_id)
  247. if process.returncode is None:
  248. try:
  249. process.terminate()
  250. stopped += 1
  251. logger.info(f"Terminated ffmpeg process for stream {stream_id}")
  252. except Exception as e:
  253. logger.warning(f"Error stopping stream {stream_id}: {e}")
  254. for stream_id in to_remove:
  255. _active_streams.pop(stream_id, None)
  256. logger.info(f"Stopped {stopped} camera stream(s) for printer {printer_id}, active streams remaining: {list(_active_streams.keys())}")
  257. return {"stopped": stopped}
  258. @router.get("/{printer_id}/camera/snapshot")
  259. async def camera_snapshot(
  260. printer_id: int,
  261. db: AsyncSession = Depends(get_db),
  262. ):
  263. """Capture a single frame from the printer camera.
  264. Returns a JPEG image.
  265. """
  266. import tempfile
  267. from pathlib import Path
  268. printer = await get_printer_or_404(printer_id, db)
  269. # Create temporary file for the snapshot
  270. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
  271. temp_path = Path(f.name)
  272. try:
  273. success = await capture_camera_frame(
  274. ip_address=printer.ip_address,
  275. access_code=printer.access_code,
  276. model=printer.model,
  277. output_path=temp_path,
  278. timeout=15,
  279. )
  280. if not success:
  281. raise HTTPException(
  282. status_code=503,
  283. detail="Failed to capture camera frame. Is the printer powered on?"
  284. )
  285. # Read and return the image
  286. with open(temp_path, "rb") as f:
  287. image_data = f.read()
  288. return Response(
  289. content=image_data,
  290. media_type="image/jpeg",
  291. headers={
  292. "Cache-Control": "no-cache, no-store, must-revalidate",
  293. "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"'
  294. }
  295. )
  296. finally:
  297. # Clean up temp file
  298. if temp_path.exists():
  299. temp_path.unlink()
  300. @router.get("/{printer_id}/camera/test")
  301. async def test_camera(
  302. printer_id: int,
  303. db: AsyncSession = Depends(get_db),
  304. ):
  305. """Test camera connection for a printer.
  306. Returns success status and any error message.
  307. """
  308. printer = await get_printer_or_404(printer_id, db)
  309. result = await test_camera_connection(
  310. ip_address=printer.ip_address,
  311. access_code=printer.access_code,
  312. model=printer.model,
  313. )
  314. return result