camera.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. """Camera streaming API endpoints for Bambu Lab printers."""
  2. import asyncio
  3. import logging
  4. from collections.abc import AsyncGenerator
  5. from fastapi import APIRouter, Depends, HTTPException, Request
  6. from fastapi.responses import Response, StreamingResponse
  7. from sqlalchemy import select
  8. from sqlalchemy.ext.asyncio import AsyncSession
  9. from backend.app.core.database import get_db
  10. from backend.app.models.printer import Printer
  11. from backend.app.services.camera import (
  12. capture_camera_frame,
  13. generate_chamber_image_stream,
  14. get_camera_port,
  15. get_ffmpeg_path,
  16. is_chamber_image_model,
  17. read_next_chamber_frame,
  18. test_camera_connection,
  19. )
  20. logger = logging.getLogger(__name__)
  21. router = APIRouter(prefix="/printers", tags=["camera"])
  22. # Track active ffmpeg processes for cleanup
  23. _active_streams: dict[str, asyncio.subprocess.Process] = {}
  24. # Track active chamber image connections for cleanup
  25. _active_chamber_streams: dict[str, tuple] = {}
  26. # Store last frame for each printer (for photo capture from active stream)
  27. _last_frames: dict[int, bytes] = {}
  28. # Track last frame timestamp for each printer (for stall detection)
  29. _last_frame_times: dict[int, float] = {}
  30. # Track stream start times for each printer
  31. _stream_start_times: dict[int, float] = {}
  32. def get_buffered_frame(printer_id: int) -> bytes | None:
  33. """Get the last buffered frame for a printer from an active stream.
  34. Returns the JPEG frame data if available, or None if no active stream.
  35. """
  36. return _last_frames.get(printer_id)
  37. async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
  38. """Get printer by ID or raise 404."""
  39. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  40. printer = result.scalar_one_or_none()
  41. if not printer:
  42. raise HTTPException(status_code=404, detail="Printer not found")
  43. return printer
  44. async def generate_chamber_mjpeg_stream(
  45. ip_address: str,
  46. access_code: str,
  47. model: str | None,
  48. fps: int = 5,
  49. stream_id: str | None = None,
  50. disconnect_event: asyncio.Event | None = None,
  51. printer_id: int | None = None,
  52. ) -> AsyncGenerator[bytes, None]:
  53. """Generate MJPEG stream from A1/P1 printer using chamber image protocol.
  54. This connects to port 6000 and reads JPEG frames using the Bambu binary protocol.
  55. """
  56. logger.info(f"Starting chamber image stream for {ip_address} (stream_id={stream_id}, model={model})")
  57. connection = await generate_chamber_image_stream(ip_address, access_code, fps)
  58. if connection is None:
  59. logger.error(f"Failed to connect to chamber image stream for {ip_address}")
  60. yield (
  61. b"--frame\r\n"
  62. b"Content-Type: text/plain\r\n\r\n"
  63. b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
  64. )
  65. return
  66. reader, writer = connection
  67. # Track active connection for cleanup
  68. if stream_id:
  69. _active_chamber_streams[stream_id] = (reader, writer)
  70. try:
  71. frame_interval = 1.0 / fps if fps > 0 else 0.2
  72. last_frame_time = 0.0
  73. while True:
  74. # Check if client disconnected
  75. if disconnect_event and disconnect_event.is_set():
  76. logger.info(f"Client disconnected, stopping chamber stream {stream_id}")
  77. break
  78. # Read next frame
  79. frame = await read_next_chamber_frame(reader, timeout=30.0)
  80. if frame is None:
  81. logger.warning(f"Chamber image stream ended for {stream_id}")
  82. break
  83. # Save frame to buffer for photo capture and track timestamp
  84. if printer_id is not None:
  85. _last_frames[printer_id] = frame
  86. _last_frame_times[printer_id] = asyncio.get_event_loop().time()
  87. # Rate limiting - skip frames if needed to maintain target FPS
  88. current_time = asyncio.get_event_loop().time()
  89. if current_time - last_frame_time < frame_interval:
  90. continue
  91. last_frame_time = current_time
  92. # Yield frame in MJPEG format
  93. yield (
  94. b"--frame\r\n"
  95. b"Content-Type: image/jpeg\r\n"
  96. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  97. b"\r\n" + frame + b"\r\n"
  98. )
  99. except asyncio.CancelledError:
  100. logger.info(f"Chamber image stream cancelled (stream_id={stream_id})")
  101. except GeneratorExit:
  102. logger.info(f"Chamber image stream generator exit (stream_id={stream_id})")
  103. except Exception as e:
  104. logger.exception(f"Chamber image stream error: {e}")
  105. finally:
  106. # Remove from active streams
  107. if stream_id and stream_id in _active_chamber_streams:
  108. del _active_chamber_streams[stream_id]
  109. # Clean up frame buffer and timestamps
  110. if printer_id is not None:
  111. _last_frames.pop(printer_id, None)
  112. _last_frame_times.pop(printer_id, None)
  113. _stream_start_times.pop(printer_id, None)
  114. # Close the connection
  115. try:
  116. writer.close()
  117. await writer.wait_closed()
  118. except Exception:
  119. pass
  120. logger.info(f"Chamber image stream stopped for {ip_address} (stream_id={stream_id})")
  121. async def generate_rtsp_mjpeg_stream(
  122. ip_address: str,
  123. access_code: str,
  124. model: str | None,
  125. fps: int = 10,
  126. stream_id: str | None = None,
  127. disconnect_event: asyncio.Event | None = None,
  128. printer_id: int | None = None,
  129. ) -> AsyncGenerator[bytes, None]:
  130. """Generate MJPEG stream from printer camera using ffmpeg/RTSP.
  131. This is for X1/H2/P2 models that support RTSP streaming.
  132. """
  133. ffmpeg = get_ffmpeg_path()
  134. if not ffmpeg:
  135. logger.error("ffmpeg not found - camera streaming requires ffmpeg")
  136. yield (b"--frame\r\n" b"Content-Type: text/plain\r\n\r\n" b"Error: ffmpeg not installed\r\n")
  137. return
  138. port = get_camera_port(model)
  139. camera_url = f"rtsps://bblp:{access_code}@{ip_address}:{port}/streaming/live/1"
  140. # ffmpeg command to output MJPEG stream to stdout
  141. # -rtsp_transport tcp: Use TCP for reliability
  142. # -rtsp_flags prefer_tcp: Prefer TCP for RTSP
  143. # -timeout: Connection timeout in microseconds (30 seconds)
  144. # -buffer_size: Larger buffer for network jitter
  145. # -max_delay: Maximum demuxing delay
  146. # -f mjpeg: Output as MJPEG
  147. # -q:v 5: Quality (lower = better, 2-10 is good range)
  148. # -r: Output framerate
  149. cmd = [
  150. ffmpeg,
  151. "-rtsp_transport",
  152. "tcp",
  153. "-rtsp_flags",
  154. "prefer_tcp",
  155. "-timeout",
  156. "30000000", # 30 seconds in microseconds
  157. "-buffer_size",
  158. "1024000", # 1MB buffer
  159. "-max_delay",
  160. "500000", # 0.5 seconds max delay
  161. "-i",
  162. camera_url,
  163. "-f",
  164. "mjpeg",
  165. "-q:v",
  166. "5",
  167. "-r",
  168. str(fps),
  169. "-an", # No audio
  170. "-", # Output to stdout
  171. ]
  172. logger.info(f"Starting RTSP camera stream for {ip_address} (stream_id={stream_id}, model={model}, fps={fps})")
  173. logger.debug(f"ffmpeg command: {ffmpeg} ... (url hidden)")
  174. process = None
  175. try:
  176. process = await asyncio.create_subprocess_exec(
  177. *cmd,
  178. stdout=asyncio.subprocess.PIPE,
  179. stderr=asyncio.subprocess.PIPE,
  180. )
  181. # Track active process for cleanup
  182. if stream_id:
  183. _active_streams[stream_id] = process
  184. # Give ffmpeg a moment to start and check for immediate failures
  185. await asyncio.sleep(0.5)
  186. if process.returncode is not None:
  187. stderr = await process.stderr.read()
  188. logger.error(f"ffmpeg failed immediately: {stderr.decode()}")
  189. yield (
  190. b"--frame\r\n"
  191. b"Content-Type: text/plain\r\n\r\n"
  192. b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
  193. )
  194. return
  195. # Read JPEG frames from ffmpeg output
  196. # JPEG images start with 0xFFD8 and end with 0xFFD9
  197. buffer = b""
  198. jpeg_start = b"\xff\xd8"
  199. jpeg_end = b"\xff\xd9"
  200. while True:
  201. # Check if client disconnected
  202. if disconnect_event and disconnect_event.is_set():
  203. logger.info(f"Client disconnected, stopping stream {stream_id}")
  204. break
  205. try:
  206. # Read chunk from ffmpeg - use longer timeout for network hiccups
  207. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  208. if not chunk:
  209. logger.warning("Camera stream ended (no more data)")
  210. break
  211. buffer += chunk
  212. # Find complete JPEG frames in buffer
  213. while True:
  214. start_idx = buffer.find(jpeg_start)
  215. if start_idx == -1:
  216. # No start marker, clear buffer up to last 2 bytes
  217. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  218. break
  219. # Trim anything before the start marker
  220. if start_idx > 0:
  221. buffer = buffer[start_idx:]
  222. end_idx = buffer.find(jpeg_end, 2) # Skip first 2 bytes
  223. if end_idx == -1:
  224. # No end marker yet, wait for more data
  225. break
  226. # Extract complete frame
  227. frame = buffer[: end_idx + 2]
  228. buffer = buffer[end_idx + 2 :]
  229. # Save frame to buffer for photo capture and track timestamp
  230. if printer_id is not None:
  231. _last_frames[printer_id] = frame
  232. _last_frame_times[printer_id] = asyncio.get_event_loop().time()
  233. # Yield frame in MJPEG format
  234. yield (
  235. b"--frame\r\n"
  236. b"Content-Type: image/jpeg\r\n"
  237. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  238. b"\r\n" + frame + b"\r\n"
  239. )
  240. except TimeoutError:
  241. logger.warning("Camera stream read timeout")
  242. break
  243. except asyncio.CancelledError:
  244. logger.info(f"Camera stream cancelled (stream_id={stream_id})")
  245. break
  246. except GeneratorExit:
  247. logger.info(f"Camera stream generator exit (stream_id={stream_id})")
  248. break
  249. except FileNotFoundError:
  250. logger.error("ffmpeg not found - camera streaming requires ffmpeg")
  251. yield (b"--frame\r\n" b"Content-Type: text/plain\r\n\r\n" b"Error: ffmpeg not installed\r\n")
  252. except asyncio.CancelledError:
  253. logger.info(f"Camera stream task cancelled (stream_id={stream_id})")
  254. except GeneratorExit:
  255. logger.info(f"Camera stream generator closed (stream_id={stream_id})")
  256. except Exception as e:
  257. logger.exception(f"Camera stream error: {e}")
  258. finally:
  259. # Remove from active streams
  260. if stream_id and stream_id in _active_streams:
  261. del _active_streams[stream_id]
  262. # Clean up frame buffer and timestamps
  263. if printer_id is not None:
  264. _last_frames.pop(printer_id, None)
  265. _last_frame_times.pop(printer_id, None)
  266. _stream_start_times.pop(printer_id, None)
  267. if process and process.returncode is None:
  268. logger.info(f"Terminating ffmpeg process for stream {stream_id}")
  269. try:
  270. process.terminate()
  271. try:
  272. await asyncio.wait_for(process.wait(), timeout=2.0)
  273. except TimeoutError:
  274. logger.warning(f"ffmpeg didn't terminate gracefully, killing (stream_id={stream_id})")
  275. process.kill()
  276. await process.wait()
  277. except ProcessLookupError:
  278. pass # Process already dead
  279. except Exception as e:
  280. logger.warning(f"Error terminating ffmpeg: {e}")
  281. logger.info(f"Camera stream stopped for {ip_address} (stream_id={stream_id})")
  282. @router.get("/{printer_id}/camera/stream")
  283. async def camera_stream(
  284. printer_id: int,
  285. request: Request,
  286. fps: int = 10,
  287. db: AsyncSession = Depends(get_db),
  288. ):
  289. """Stream live video from printer camera as MJPEG.
  290. This endpoint returns a multipart MJPEG stream that can be used directly
  291. in an <img> tag or video player.
  292. Uses the appropriate protocol based on printer model:
  293. - A1/P1: Chamber image protocol (port 6000)
  294. - X1/H2/P2: RTSP via ffmpeg (port 322)
  295. Args:
  296. printer_id: Printer ID
  297. fps: Target frames per second (default: 10, max: 30)
  298. """
  299. import uuid
  300. printer = await get_printer_or_404(printer_id, db)
  301. # Validate FPS - A1/P1 models max out at ~5 FPS
  302. if is_chamber_image_model(printer.model):
  303. fps = min(max(fps, 1), 5)
  304. else:
  305. fps = min(max(fps, 1), 30)
  306. # Generate unique stream ID for tracking
  307. stream_id = f"{printer_id}-{uuid.uuid4().hex[:8]}"
  308. # Create disconnect event that will be set when client disconnects
  309. disconnect_event = asyncio.Event()
  310. # Choose the appropriate stream generator based on model
  311. if is_chamber_image_model(printer.model):
  312. stream_generator = generate_chamber_mjpeg_stream
  313. logger.info(f"Using chamber image protocol for {printer.model}")
  314. else:
  315. stream_generator = generate_rtsp_mjpeg_stream
  316. logger.info(f"Using RTSP protocol for {printer.model}")
  317. # Track stream start time
  318. import time
  319. _stream_start_times[printer_id] = time.time()
  320. async def stream_with_disconnect_check():
  321. """Wrapper generator that monitors for client disconnect."""
  322. try:
  323. async for chunk in stream_generator(
  324. ip_address=printer.ip_address,
  325. access_code=printer.access_code,
  326. model=printer.model,
  327. fps=fps,
  328. stream_id=stream_id,
  329. disconnect_event=disconnect_event,
  330. printer_id=printer_id,
  331. ):
  332. # Check if client is still connected
  333. if await request.is_disconnected():
  334. logger.info(f"Client disconnected detected for stream {stream_id}")
  335. disconnect_event.set()
  336. break
  337. yield chunk
  338. except asyncio.CancelledError:
  339. logger.info(f"Stream {stream_id} cancelled")
  340. disconnect_event.set()
  341. except GeneratorExit:
  342. logger.info(f"Stream {stream_id} generator closed")
  343. disconnect_event.set()
  344. finally:
  345. disconnect_event.set()
  346. # Give a moment for the inner generator to clean up
  347. await asyncio.sleep(0.1)
  348. return StreamingResponse(
  349. stream_with_disconnect_check(),
  350. media_type="multipart/x-mixed-replace; boundary=frame",
  351. headers={
  352. "Cache-Control": "no-cache, no-store, must-revalidate",
  353. "Pragma": "no-cache",
  354. "Expires": "0",
  355. },
  356. )
  357. @router.api_route("/{printer_id}/camera/stop", methods=["GET", "POST"])
  358. async def stop_camera_stream(printer_id: int):
  359. """Stop all active camera streams for a printer.
  360. This can be called by the frontend when the camera window is closed.
  361. Accepts both GET and POST (POST for sendBeacon compatibility).
  362. """
  363. stopped = 0
  364. # Stop ffmpeg/RTSP streams
  365. to_remove = []
  366. for stream_id, process in list(_active_streams.items()):
  367. if stream_id.startswith(f"{printer_id}-"):
  368. to_remove.append(stream_id)
  369. if process.returncode is None:
  370. try:
  371. process.terminate()
  372. stopped += 1
  373. logger.info(f"Terminated ffmpeg process for stream {stream_id}")
  374. except Exception as e:
  375. logger.warning(f"Error stopping stream {stream_id}: {e}")
  376. for stream_id in to_remove:
  377. _active_streams.pop(stream_id, None)
  378. # Stop chamber image streams
  379. to_remove_chamber = []
  380. for stream_id, (_reader, writer) in list(_active_chamber_streams.items()):
  381. if stream_id.startswith(f"{printer_id}-"):
  382. to_remove_chamber.append(stream_id)
  383. try:
  384. writer.close()
  385. stopped += 1
  386. logger.info(f"Closed chamber image connection for stream {stream_id}")
  387. except Exception as e:
  388. logger.warning(f"Error stopping chamber stream {stream_id}: {e}")
  389. for stream_id in to_remove_chamber:
  390. _active_chamber_streams.pop(stream_id, None)
  391. logger.info(f"Stopped {stopped} camera stream(s) for printer {printer_id}")
  392. return {"stopped": stopped}
  393. @router.get("/{printer_id}/camera/snapshot")
  394. async def camera_snapshot(
  395. printer_id: int,
  396. db: AsyncSession = Depends(get_db),
  397. ):
  398. """Capture a single frame from the printer camera.
  399. Returns a JPEG image.
  400. """
  401. import tempfile
  402. from pathlib import Path
  403. printer = await get_printer_or_404(printer_id, db)
  404. # Create temporary file for the snapshot
  405. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
  406. temp_path = Path(f.name)
  407. try:
  408. success = await capture_camera_frame(
  409. ip_address=printer.ip_address,
  410. access_code=printer.access_code,
  411. model=printer.model,
  412. output_path=temp_path,
  413. timeout=15,
  414. )
  415. if not success:
  416. raise HTTPException(
  417. status_code=503,
  418. detail="Failed to capture camera frame. Ensure printer is on and camera is enabled.",
  419. )
  420. # Read and return the image
  421. with open(temp_path, "rb") as f:
  422. image_data = f.read()
  423. return Response(
  424. content=image_data,
  425. media_type="image/jpeg",
  426. headers={
  427. "Cache-Control": "no-cache, no-store, must-revalidate",
  428. "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
  429. },
  430. )
  431. finally:
  432. # Clean up temp file
  433. if temp_path.exists():
  434. temp_path.unlink()
  435. @router.get("/{printer_id}/camera/test")
  436. async def test_camera(
  437. printer_id: int,
  438. db: AsyncSession = Depends(get_db),
  439. ):
  440. """Test camera connection for a printer.
  441. Returns success status and any error message.
  442. """
  443. printer = await get_printer_or_404(printer_id, db)
  444. result = await test_camera_connection(
  445. ip_address=printer.ip_address,
  446. access_code=printer.access_code,
  447. model=printer.model,
  448. )
  449. return result
  450. @router.get("/{printer_id}/camera/status")
  451. async def camera_status(printer_id: int):
  452. """Get the status of an active camera stream.
  453. Returns whether a stream is active and when the last frame was received.
  454. Used by the frontend to detect stalled streams and auto-reconnect.
  455. """
  456. import time
  457. # Check if there's an active stream for this printer
  458. has_active_stream = False
  459. # Check ffmpeg/RTSP streams
  460. for stream_id in _active_streams:
  461. if stream_id.startswith(f"{printer_id}-"):
  462. process = _active_streams[stream_id]
  463. if process.returncode is None:
  464. has_active_stream = True
  465. break
  466. # Check chamber image streams
  467. if not has_active_stream:
  468. for stream_id in _active_chamber_streams:
  469. if stream_id.startswith(f"{printer_id}-"):
  470. has_active_stream = True
  471. break
  472. # Get timing information
  473. current_time = time.time()
  474. last_frame_time = _last_frame_times.get(printer_id)
  475. stream_start_time = _stream_start_times.get(printer_id)
  476. # Calculate seconds since last frame
  477. seconds_since_frame = None
  478. if last_frame_time is not None:
  479. seconds_since_frame = current_time - last_frame_time
  480. # Calculate stream uptime
  481. stream_uptime = None
  482. if stream_start_time is not None:
  483. stream_uptime = current_time - stream_start_time
  484. return {
  485. "active": has_active_stream,
  486. "has_frames": printer_id in _last_frames,
  487. "seconds_since_frame": seconds_since_frame,
  488. "stream_uptime": stream_uptime,
  489. # Consider stalled if no frame for more than 10 seconds after stream started
  490. "stalled": (
  491. has_active_stream
  492. and stream_uptime is not None
  493. and stream_uptime > 5 # Give 5 seconds for stream to start
  494. and (seconds_since_frame is None or seconds_since_frame > 10)
  495. ),
  496. }