| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- """Camera streaming API endpoints for Bambu Lab printers."""
- import asyncio
- import logging
- from typing import AsyncGenerator
- from fastapi import APIRouter, HTTPException, Depends
- from fastapi.responses import StreamingResponse, Response
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from backend.app.core.database import get_db
- from backend.app.models.printer import Printer
- from backend.app.services.camera import (
- build_camera_url,
- capture_camera_frame,
- test_camera_connection,
- )
- from backend.app.services.printer_manager import printer_manager
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/printers", tags=["camera"])
- async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
- """Get printer by ID or raise 404."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(status_code=404, detail="Printer not found")
- return printer
- async def generate_mjpeg_stream(
- ip_address: str,
- access_code: str,
- model: str | None,
- fps: int = 10,
- ) -> AsyncGenerator[bytes, None]:
- """Generate MJPEG stream from printer camera using ffmpeg.
- This captures frames continuously and yields them in MJPEG format.
- """
- from backend.app.services.camera import get_camera_port
- port = get_camera_port(model)
- camera_url = f"rtsps://bblp:{access_code}@{ip_address}:{port}/streaming/live/1"
- # ffmpeg command to output MJPEG stream to stdout
- # -re: Read input at native frame rate
- # -rtsp_transport tcp: Use TCP for reliability
- # -f mjpeg: Output as MJPEG
- # -q:v 5: Quality (lower = better, 2-10 is good range)
- # -r: Output framerate
- cmd = [
- "ffmpeg",
- "-rtsp_transport", "tcp",
- "-i", camera_url,
- "-f", "mjpeg",
- "-q:v", "5",
- "-r", str(fps),
- "-an", # No audio
- "-" # Output to stdout
- ]
- logger.info(f"Starting camera stream for {ip_address}")
- process = None
- try:
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- # Read JPEG frames from ffmpeg output
- # JPEG images start with 0xFFD8 and end with 0xFFD9
- buffer = b""
- jpeg_start = b"\xff\xd8"
- jpeg_end = b"\xff\xd9"
- while True:
- try:
- # Read chunk from ffmpeg
- chunk = await asyncio.wait_for(
- process.stdout.read(8192),
- timeout=10.0
- )
- if not chunk:
- logger.warning("Camera stream ended (no more data)")
- break
- buffer += chunk
- # Find complete JPEG frames in buffer
- while True:
- start_idx = buffer.find(jpeg_start)
- if start_idx == -1:
- # No start marker, clear buffer up to last 2 bytes
- buffer = buffer[-2:] if len(buffer) > 2 else buffer
- break
- # Trim anything before the start marker
- if start_idx > 0:
- buffer = buffer[start_idx:]
- end_idx = buffer.find(jpeg_end, 2) # Skip first 2 bytes
- if end_idx == -1:
- # No end marker yet, wait for more data
- break
- # Extract complete frame
- frame = buffer[:end_idx + 2]
- buffer = buffer[end_idx + 2:]
- # Yield frame in MJPEG format
- yield (
- b"--frame\r\n"
- b"Content-Type: image/jpeg\r\n"
- b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
- b"\r\n" + frame + b"\r\n"
- )
- except asyncio.TimeoutError:
- logger.warning("Camera stream read timeout")
- break
- except asyncio.CancelledError:
- logger.info("Camera stream cancelled")
- break
- except FileNotFoundError:
- logger.error("ffmpeg not found - camera streaming requires ffmpeg")
- yield (
- b"--frame\r\n"
- b"Content-Type: text/plain\r\n\r\n"
- b"Error: ffmpeg not installed\r\n"
- )
- except Exception as e:
- logger.exception(f"Camera stream error: {e}")
- finally:
- if process:
- try:
- process.terminate()
- await asyncio.wait_for(process.wait(), timeout=5.0)
- except Exception:
- process.kill()
- await process.wait()
- logger.info(f"Camera stream stopped for {ip_address}")
- @router.get("/{printer_id}/camera/stream")
- async def camera_stream(
- printer_id: int,
- fps: int = 10,
- db: AsyncSession = Depends(get_db),
- ):
- """Stream live video from printer camera as MJPEG.
- This endpoint returns a multipart MJPEG stream that can be used directly
- in an <img> tag or video player.
- Args:
- printer_id: Printer ID
- fps: Target frames per second (default: 10, max: 30)
- """
- printer = await get_printer_or_404(printer_id, db)
- # Validate FPS
- fps = min(max(fps, 1), 30)
- return StreamingResponse(
- generate_mjpeg_stream(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- fps=fps,
- ),
- media_type="multipart/x-mixed-replace; boundary=frame",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Pragma": "no-cache",
- "Expires": "0",
- }
- )
- @router.get("/{printer_id}/camera/snapshot")
- async def camera_snapshot(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- ):
- """Capture a single frame from the printer camera.
- Returns a JPEG image.
- """
- import tempfile
- from pathlib import Path
- printer = await get_printer_or_404(printer_id, db)
- # Create temporary file for the snapshot
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
- temp_path = Path(f.name)
- try:
- success = await capture_camera_frame(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- output_path=temp_path,
- timeout=15,
- )
- if not success:
- raise HTTPException(
- status_code=503,
- detail="Failed to capture camera frame. Is the printer powered on?"
- )
- # Read and return the image
- with open(temp_path, "rb") as f:
- image_data = f.read()
- return Response(
- content=image_data,
- media_type="image/jpeg",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"'
- }
- )
- finally:
- # Clean up temp file
- if temp_path.exists():
- temp_path.unlink()
- @router.get("/{printer_id}/camera/test")
- async def test_camera(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- ):
- """Test camera connection for a printer.
- Returns success status and any error message.
- """
- printer = await get_printer_or_404(printer_id, db)
- result = await test_camera_connection(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- )
- return result
|