| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 |
- """Camera streaming API endpoints for Bambu Lab printers."""
- import asyncio
- import logging
- from collections.abc import AsyncGenerator
- from fastapi import APIRouter, Depends, HTTPException, Request
- from fastapi.responses import Response, StreamingResponse
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.core.database import get_db
- from backend.app.models.printer import Printer
- from backend.app.services.camera import (
- capture_camera_frame,
- generate_chamber_image_stream,
- get_camera_port,
- get_ffmpeg_path,
- is_chamber_image_model,
- read_next_chamber_frame,
- test_camera_connection,
- )
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/printers", tags=["camera"])
- # Track active ffmpeg processes for cleanup
- _active_streams: dict[str, asyncio.subprocess.Process] = {}
- # Track active chamber image connections for cleanup
- _active_chamber_streams: dict[str, tuple] = {}
- # Store last frame for each printer (for photo capture from active stream)
- _last_frames: dict[int, bytes] = {}
- def get_buffered_frame(printer_id: int) -> bytes | None:
- """Get the last buffered frame for a printer from an active stream.
- Returns the JPEG frame data if available, or None if no active stream.
- """
- return _last_frames.get(printer_id)
- async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
- """Get printer by ID or raise 404."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(status_code=404, detail="Printer not found")
- return printer
- async def generate_chamber_mjpeg_stream(
- ip_address: str,
- access_code: str,
- model: str | None,
- fps: int = 5,
- stream_id: str | None = None,
- disconnect_event: asyncio.Event | None = None,
- printer_id: int | None = None,
- ) -> AsyncGenerator[bytes, None]:
- """Generate MJPEG stream from A1/P1 printer using chamber image protocol.
- This connects to port 6000 and reads JPEG frames using the Bambu binary protocol.
- """
- logger.info(f"Starting chamber image stream for {ip_address} (stream_id={stream_id}, model={model})")
- connection = await generate_chamber_image_stream(ip_address, access_code, fps)
- if connection is None:
- logger.error(f"Failed to connect to chamber image stream for {ip_address}")
- yield (
- b"--frame\r\n"
- b"Content-Type: text/plain\r\n\r\n"
- b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
- )
- return
- reader, writer = connection
- # Track active connection for cleanup
- if stream_id:
- _active_chamber_streams[stream_id] = (reader, writer)
- try:
- frame_interval = 1.0 / fps if fps > 0 else 0.2
- last_frame_time = 0.0
- while True:
- # Check if client disconnected
- if disconnect_event and disconnect_event.is_set():
- logger.info(f"Client disconnected, stopping chamber stream {stream_id}")
- break
- # Read next frame
- frame = await read_next_chamber_frame(reader, timeout=30.0)
- if frame is None:
- logger.warning(f"Chamber image stream ended for {stream_id}")
- break
- # Save frame to buffer for photo capture
- if printer_id is not None:
- _last_frames[printer_id] = frame
- # Rate limiting - skip frames if needed to maintain target FPS
- current_time = asyncio.get_event_loop().time()
- if current_time - last_frame_time < frame_interval:
- continue
- last_frame_time = current_time
- # Yield frame in MJPEG format
- yield (
- b"--frame\r\n"
- b"Content-Type: image/jpeg\r\n"
- b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
- b"\r\n" + frame + b"\r\n"
- )
- except asyncio.CancelledError:
- logger.info(f"Chamber image stream cancelled (stream_id={stream_id})")
- except GeneratorExit:
- logger.info(f"Chamber image stream generator exit (stream_id={stream_id})")
- except Exception as e:
- logger.exception(f"Chamber image stream error: {e}")
- finally:
- # Remove from active streams
- if stream_id and stream_id in _active_chamber_streams:
- del _active_chamber_streams[stream_id]
- # Clean up frame buffer
- if printer_id is not None and printer_id in _last_frames:
- del _last_frames[printer_id]
- # Close the connection
- try:
- writer.close()
- await writer.wait_closed()
- except Exception:
- pass
- logger.info(f"Chamber image stream stopped for {ip_address} (stream_id={stream_id})")
- async def generate_rtsp_mjpeg_stream(
- ip_address: str,
- access_code: str,
- model: str | None,
- fps: int = 10,
- stream_id: str | None = None,
- disconnect_event: asyncio.Event | None = None,
- printer_id: int | None = None,
- ) -> AsyncGenerator[bytes, None]:
- """Generate MJPEG stream from printer camera using ffmpeg/RTSP.
- This is for X1/H2/P2 models that support RTSP streaming.
- """
- ffmpeg = get_ffmpeg_path()
- if not ffmpeg:
- logger.error("ffmpeg not found - camera streaming requires ffmpeg")
- yield (b"--frame\r\n" b"Content-Type: text/plain\r\n\r\n" b"Error: ffmpeg not installed\r\n")
- return
- port = get_camera_port(model)
- camera_url = f"rtsps://bblp:{access_code}@{ip_address}:{port}/streaming/live/1"
- # ffmpeg command to output MJPEG stream to stdout
- # -rtsp_transport tcp: Use TCP for reliability
- # -rtsp_flags prefer_tcp: Prefer TCP for RTSP
- # -timeout: Connection timeout in microseconds (30 seconds)
- # -buffer_size: Larger buffer for network jitter
- # -max_delay: Maximum demuxing delay
- # -f mjpeg: Output as MJPEG
- # -q:v 5: Quality (lower = better, 2-10 is good range)
- # -r: Output framerate
- cmd = [
- ffmpeg,
- "-rtsp_transport",
- "tcp",
- "-rtsp_flags",
- "prefer_tcp",
- "-timeout",
- "30000000", # 30 seconds in microseconds
- "-buffer_size",
- "1024000", # 1MB buffer
- "-max_delay",
- "500000", # 0.5 seconds max delay
- "-i",
- camera_url,
- "-f",
- "mjpeg",
- "-q:v",
- "5",
- "-r",
- str(fps),
- "-an", # No audio
- "-", # Output to stdout
- ]
- logger.info(f"Starting RTSP camera stream for {ip_address} (stream_id={stream_id}, model={model}, fps={fps})")
- logger.debug(f"ffmpeg command: {ffmpeg} ... (url hidden)")
- process = None
- try:
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- # Track active process for cleanup
- if stream_id:
- _active_streams[stream_id] = process
- # Give ffmpeg a moment to start and check for immediate failures
- await asyncio.sleep(0.5)
- if process.returncode is not None:
- stderr = await process.stderr.read()
- logger.error(f"ffmpeg failed immediately: {stderr.decode()}")
- yield (
- b"--frame\r\n"
- b"Content-Type: text/plain\r\n\r\n"
- b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
- )
- return
- # Read JPEG frames from ffmpeg output
- # JPEG images start with 0xFFD8 and end with 0xFFD9
- buffer = b""
- jpeg_start = b"\xff\xd8"
- jpeg_end = b"\xff\xd9"
- while True:
- # Check if client disconnected
- if disconnect_event and disconnect_event.is_set():
- logger.info(f"Client disconnected, stopping stream {stream_id}")
- break
- try:
- # Read chunk from ffmpeg - use longer timeout for network hiccups
- chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
- if not chunk:
- logger.warning("Camera stream ended (no more data)")
- break
- buffer += chunk
- # Find complete JPEG frames in buffer
- while True:
- start_idx = buffer.find(jpeg_start)
- if start_idx == -1:
- # No start marker, clear buffer up to last 2 bytes
- buffer = buffer[-2:] if len(buffer) > 2 else buffer
- break
- # Trim anything before the start marker
- if start_idx > 0:
- buffer = buffer[start_idx:]
- end_idx = buffer.find(jpeg_end, 2) # Skip first 2 bytes
- if end_idx == -1:
- # No end marker yet, wait for more data
- break
- # Extract complete frame
- frame = buffer[: end_idx + 2]
- buffer = buffer[end_idx + 2 :]
- # Save frame to buffer for photo capture
- if printer_id is not None:
- _last_frames[printer_id] = frame
- # Yield frame in MJPEG format
- yield (
- b"--frame\r\n"
- b"Content-Type: image/jpeg\r\n"
- b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
- b"\r\n" + frame + b"\r\n"
- )
- except TimeoutError:
- logger.warning("Camera stream read timeout")
- break
- except asyncio.CancelledError:
- logger.info(f"Camera stream cancelled (stream_id={stream_id})")
- break
- except GeneratorExit:
- logger.info(f"Camera stream generator exit (stream_id={stream_id})")
- break
- except FileNotFoundError:
- logger.error("ffmpeg not found - camera streaming requires ffmpeg")
- yield (b"--frame\r\n" b"Content-Type: text/plain\r\n\r\n" b"Error: ffmpeg not installed\r\n")
- except asyncio.CancelledError:
- logger.info(f"Camera stream task cancelled (stream_id={stream_id})")
- except GeneratorExit:
- logger.info(f"Camera stream generator closed (stream_id={stream_id})")
- except Exception as e:
- logger.exception(f"Camera stream error: {e}")
- finally:
- # Remove from active streams
- if stream_id and stream_id in _active_streams:
- del _active_streams[stream_id]
- # Clean up frame buffer
- if printer_id is not None and printer_id in _last_frames:
- del _last_frames[printer_id]
- if process and process.returncode is None:
- logger.info(f"Terminating ffmpeg process for stream {stream_id}")
- try:
- process.terminate()
- try:
- await asyncio.wait_for(process.wait(), timeout=2.0)
- except TimeoutError:
- logger.warning(f"ffmpeg didn't terminate gracefully, killing (stream_id={stream_id})")
- process.kill()
- await process.wait()
- except ProcessLookupError:
- pass # Process already dead
- except Exception as e:
- logger.warning(f"Error terminating ffmpeg: {e}")
- logger.info(f"Camera stream stopped for {ip_address} (stream_id={stream_id})")
- @router.get("/{printer_id}/camera/stream")
- async def camera_stream(
- printer_id: int,
- request: Request,
- fps: int = 10,
- db: AsyncSession = Depends(get_db),
- ):
- """Stream live video from printer camera as MJPEG.
- This endpoint returns a multipart MJPEG stream that can be used directly
- in an <img> tag or video player.
- Uses the appropriate protocol based on printer model:
- - A1/P1: Chamber image protocol (port 6000)
- - X1/H2/P2: RTSP via ffmpeg (port 322)
- Args:
- printer_id: Printer ID
- fps: Target frames per second (default: 10, max: 30)
- """
- import uuid
- printer = await get_printer_or_404(printer_id, db)
- # Validate FPS - A1/P1 models max out at ~5 FPS
- if is_chamber_image_model(printer.model):
- fps = min(max(fps, 1), 5)
- else:
- fps = min(max(fps, 1), 30)
- # Generate unique stream ID for tracking
- stream_id = f"{printer_id}-{uuid.uuid4().hex[:8]}"
- # Create disconnect event that will be set when client disconnects
- disconnect_event = asyncio.Event()
- # Choose the appropriate stream generator based on model
- if is_chamber_image_model(printer.model):
- stream_generator = generate_chamber_mjpeg_stream
- logger.info(f"Using chamber image protocol for {printer.model}")
- else:
- stream_generator = generate_rtsp_mjpeg_stream
- logger.info(f"Using RTSP protocol for {printer.model}")
- async def stream_with_disconnect_check():
- """Wrapper generator that monitors for client disconnect."""
- try:
- async for chunk in stream_generator(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- fps=fps,
- stream_id=stream_id,
- disconnect_event=disconnect_event,
- printer_id=printer_id,
- ):
- # Check if client is still connected
- if await request.is_disconnected():
- logger.info(f"Client disconnected detected for stream {stream_id}")
- disconnect_event.set()
- break
- yield chunk
- except asyncio.CancelledError:
- logger.info(f"Stream {stream_id} cancelled")
- disconnect_event.set()
- except GeneratorExit:
- logger.info(f"Stream {stream_id} generator closed")
- disconnect_event.set()
- finally:
- disconnect_event.set()
- # Give a moment for the inner generator to clean up
- await asyncio.sleep(0.1)
- return StreamingResponse(
- stream_with_disconnect_check(),
- media_type="multipart/x-mixed-replace; boundary=frame",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Pragma": "no-cache",
- "Expires": "0",
- },
- )
- @router.api_route("/{printer_id}/camera/stop", methods=["GET", "POST"])
- async def stop_camera_stream(printer_id: int):
- """Stop all active camera streams for a printer.
- This can be called by the frontend when the camera window is closed.
- Accepts both GET and POST (POST for sendBeacon compatibility).
- """
- stopped = 0
- # Stop ffmpeg/RTSP streams
- to_remove = []
- for stream_id, process in list(_active_streams.items()):
- if stream_id.startswith(f"{printer_id}-"):
- to_remove.append(stream_id)
- if process.returncode is None:
- try:
- process.terminate()
- stopped += 1
- logger.info(f"Terminated ffmpeg process for stream {stream_id}")
- except Exception as e:
- logger.warning(f"Error stopping stream {stream_id}: {e}")
- for stream_id in to_remove:
- _active_streams.pop(stream_id, None)
- # Stop chamber image streams
- to_remove_chamber = []
- for stream_id, (_reader, writer) in list(_active_chamber_streams.items()):
- if stream_id.startswith(f"{printer_id}-"):
- to_remove_chamber.append(stream_id)
- try:
- writer.close()
- stopped += 1
- logger.info(f"Closed chamber image connection for stream {stream_id}")
- except Exception as e:
- logger.warning(f"Error stopping chamber stream {stream_id}: {e}")
- for stream_id in to_remove_chamber:
- _active_chamber_streams.pop(stream_id, None)
- logger.info(f"Stopped {stopped} camera stream(s) for printer {printer_id}")
- return {"stopped": stopped}
- @router.get("/{printer_id}/camera/snapshot")
- async def camera_snapshot(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- ):
- """Capture a single frame from the printer camera.
- Returns a JPEG image.
- """
- import tempfile
- from pathlib import Path
- printer = await get_printer_or_404(printer_id, db)
- # Create temporary file for the snapshot
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
- temp_path = Path(f.name)
- try:
- success = await capture_camera_frame(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- output_path=temp_path,
- timeout=15,
- )
- if not success:
- raise HTTPException(
- status_code=503,
- detail="Failed to capture camera frame. Ensure printer is on and camera is enabled.",
- )
- # Read and return the image
- with open(temp_path, "rb") as f:
- image_data = f.read()
- return Response(
- content=image_data,
- media_type="image/jpeg",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
- },
- )
- finally:
- # Clean up temp file
- if temp_path.exists():
- temp_path.unlink()
- @router.get("/{printer_id}/camera/test")
- async def test_camera(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- ):
- """Test camera connection for a printer.
- Returns success status and any error message.
- """
- printer = await get_printer_or_404(printer_id, db)
- result = await test_camera_connection(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- )
- return result
|