|
|
@@ -2,9 +2,10 @@
|
|
|
|
|
|
import asyncio
|
|
|
import logging
|
|
|
+import weakref
|
|
|
from typing import AsyncGenerator
|
|
|
|
|
|
-from fastapi import APIRouter, HTTPException, Depends
|
|
|
+from fastapi import APIRouter, HTTPException, Depends, Request
|
|
|
from fastapi.responses import StreamingResponse, Response
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
from sqlalchemy import select
|
|
|
@@ -23,6 +24,9 @@ from backend.app.services.printer_manager import printer_manager
|
|
|
logger = logging.getLogger(__name__)
|
|
|
router = APIRouter(prefix="/printers", tags=["camera"])
|
|
|
|
|
|
+# Track active ffmpeg processes for cleanup
|
|
|
+_active_streams: dict[str, asyncio.subprocess.Process] = {}
|
|
|
+
|
|
|
|
|
|
async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
|
|
|
"""Get printer by ID or raise 404."""
|
|
|
@@ -38,6 +42,8 @@ async def generate_mjpeg_stream(
|
|
|
access_code: str,
|
|
|
model: str | None,
|
|
|
fps: int = 10,
|
|
|
+ stream_id: str | None = None,
|
|
|
+ disconnect_event: asyncio.Event | None = None,
|
|
|
) -> AsyncGenerator[bytes, None]:
|
|
|
"""Generate MJPEG stream from printer camera using ffmpeg.
|
|
|
|
|
|
@@ -74,7 +80,7 @@ async def generate_mjpeg_stream(
|
|
|
"-" # Output to stdout
|
|
|
]
|
|
|
|
|
|
- logger.info(f"Starting camera stream for {ip_address} using URL: rtsps://bblp:***@{ip_address}:{port}/streaming/live/1")
|
|
|
+ logger.info(f"Starting camera stream for {ip_address} (stream_id={stream_id})")
|
|
|
logger.debug(f"ffmpeg command: {ffmpeg} ... (url hidden)")
|
|
|
|
|
|
process = None
|
|
|
@@ -85,6 +91,10 @@ async def generate_mjpeg_stream(
|
|
|
stderr=asyncio.subprocess.PIPE,
|
|
|
)
|
|
|
|
|
|
+ # Track active process for cleanup
|
|
|
+ if stream_id:
|
|
|
+ _active_streams[stream_id] = process
|
|
|
+
|
|
|
# Give ffmpeg a moment to start and check for immediate failures
|
|
|
await asyncio.sleep(0.5)
|
|
|
if process.returncode is not None:
|
|
|
@@ -104,6 +114,11 @@ async def generate_mjpeg_stream(
|
|
|
jpeg_end = b"\xff\xd9"
|
|
|
|
|
|
while True:
|
|
|
+ # Check if client disconnected
|
|
|
+ if disconnect_event and disconnect_event.is_set():
|
|
|
+ logger.info(f"Client disconnected, stopping stream {stream_id}")
|
|
|
+ break
|
|
|
+
|
|
|
try:
|
|
|
# Read chunk from ffmpeg
|
|
|
chunk = await asyncio.wait_for(
|
|
|
@@ -150,7 +165,10 @@ async def generate_mjpeg_stream(
|
|
|
logger.warning("Camera stream read timeout")
|
|
|
break
|
|
|
except asyncio.CancelledError:
|
|
|
- logger.info("Camera stream cancelled")
|
|
|
+ logger.info(f"Camera stream cancelled (stream_id={stream_id})")
|
|
|
+ break
|
|
|
+ except GeneratorExit:
|
|
|
+ logger.info(f"Camera stream generator exit (stream_id={stream_id})")
|
|
|
break
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
@@ -160,22 +178,38 @@ async def generate_mjpeg_stream(
|
|
|
b"Content-Type: text/plain\r\n\r\n"
|
|
|
b"Error: ffmpeg not installed\r\n"
|
|
|
)
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ logger.info(f"Camera stream task cancelled (stream_id={stream_id})")
|
|
|
+ except GeneratorExit:
|
|
|
+ logger.info(f"Camera stream generator closed (stream_id={stream_id})")
|
|
|
except Exception as e:
|
|
|
logger.exception(f"Camera stream error: {e}")
|
|
|
finally:
|
|
|
- if process:
|
|
|
+ # Remove from active streams
|
|
|
+ if stream_id and stream_id in _active_streams:
|
|
|
+ del _active_streams[stream_id]
|
|
|
+
|
|
|
+ if process and process.returncode is None:
|
|
|
+ logger.info(f"Terminating ffmpeg process for stream {stream_id}")
|
|
|
try:
|
|
|
process.terminate()
|
|
|
- await asyncio.wait_for(process.wait(), timeout=5.0)
|
|
|
- except Exception:
|
|
|
- process.kill()
|
|
|
- await process.wait()
|
|
|
- logger.info(f"Camera stream stopped for {ip_address}")
|
|
|
+ try:
|
|
|
+ await asyncio.wait_for(process.wait(), timeout=2.0)
|
|
|
+ except asyncio.TimeoutError:
|
|
|
+ logger.warning(f"ffmpeg didn't terminate gracefully, killing (stream_id={stream_id})")
|
|
|
+ process.kill()
|
|
|
+ await process.wait()
|
|
|
+ except ProcessLookupError:
|
|
|
+ pass # Process already dead
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Error terminating ffmpeg: {e}")
|
|
|
+ logger.info(f"Camera stream stopped for {ip_address} (stream_id={stream_id})")
|
|
|
|
|
|
|
|
|
@router.get("/{printer_id}/camera/stream")
|
|
|
async def camera_stream(
|
|
|
printer_id: int,
|
|
|
+ request: Request,
|
|
|
fps: int = 10,
|
|
|
db: AsyncSession = Depends(get_db),
|
|
|
):
|
|
|
@@ -188,18 +222,49 @@ async def camera_stream(
|
|
|
printer_id: Printer ID
|
|
|
fps: Target frames per second (default: 10, max: 30)
|
|
|
"""
|
|
|
+ import uuid
|
|
|
+
|
|
|
printer = await get_printer_or_404(printer_id, db)
|
|
|
|
|
|
# Validate FPS
|
|
|
fps = min(max(fps, 1), 30)
|
|
|
|
|
|
+ # Generate unique stream ID for tracking
|
|
|
+ stream_id = f"{printer_id}-{uuid.uuid4().hex[:8]}"
|
|
|
+
|
|
|
+ # Create disconnect event that will be set when client disconnects
|
|
|
+ disconnect_event = asyncio.Event()
|
|
|
+
|
|
|
+ async def stream_with_disconnect_check():
|
|
|
+ """Wrapper generator that monitors for client disconnect."""
|
|
|
+ try:
|
|
|
+ async for chunk in generate_mjpeg_stream(
|
|
|
+ ip_address=printer.ip_address,
|
|
|
+ access_code=printer.access_code,
|
|
|
+ model=printer.model,
|
|
|
+ fps=fps,
|
|
|
+ stream_id=stream_id,
|
|
|
+ disconnect_event=disconnect_event,
|
|
|
+ ):
|
|
|
+ # Check if client is still connected
|
|
|
+ if await request.is_disconnected():
|
|
|
+ logger.info(f"Client disconnected detected for stream {stream_id}")
|
|
|
+ disconnect_event.set()
|
|
|
+ break
|
|
|
+ yield chunk
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ logger.info(f"Stream {stream_id} cancelled")
|
|
|
+ disconnect_event.set()
|
|
|
+ except GeneratorExit:
|
|
|
+ logger.info(f"Stream {stream_id} generator closed")
|
|
|
+ disconnect_event.set()
|
|
|
+ finally:
|
|
|
+ disconnect_event.set()
|
|
|
+ # Give a moment for the inner generator to clean up
|
|
|
+ await asyncio.sleep(0.1)
|
|
|
+
|
|
|
return StreamingResponse(
|
|
|
- generate_mjpeg_stream(
|
|
|
- ip_address=printer.ip_address,
|
|
|
- access_code=printer.access_code,
|
|
|
- model=printer.model,
|
|
|
- fps=fps,
|
|
|
- ),
|
|
|
+ stream_with_disconnect_check(),
|
|
|
media_type="multipart/x-mixed-replace; boundary=frame",
|
|
|
headers={
|
|
|
"Cache-Control": "no-cache, no-store, must-revalidate",
|
|
|
@@ -209,6 +274,33 @@ async def camera_stream(
|
|
|
)
|
|
|
|
|
|
|
|
|
+@router.api_route("/{printer_id}/camera/stop", methods=["GET", "POST"])
|
|
|
+async def stop_camera_stream(printer_id: int):
|
|
|
+ """Stop all active camera streams for a printer.
|
|
|
+
|
|
|
+ This can be called by the frontend when the camera window is closed.
|
|
|
+ Accepts both GET and POST (POST for sendBeacon compatibility).
|
|
|
+ """
|
|
|
+ stopped = 0
|
|
|
+ to_remove = []
|
|
|
+ for stream_id, process in list(_active_streams.items()):
|
|
|
+ if stream_id.startswith(f"{printer_id}-"):
|
|
|
+ to_remove.append(stream_id)
|
|
|
+ if process.returncode is None:
|
|
|
+ try:
|
|
|
+ process.terminate()
|
|
|
+ stopped += 1
|
|
|
+ logger.info(f"Terminated ffmpeg process for stream {stream_id}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Error stopping stream {stream_id}: {e}")
|
|
|
+
|
|
|
+ for stream_id in to_remove:
|
|
|
+ _active_streams.pop(stream_id, None)
|
|
|
+
|
|
|
+ logger.info(f"Stopped {stopped} camera stream(s) for printer {printer_id}, active streams remaining: {list(_active_streams.keys())}")
|
|
|
+ return {"stopped": stopped}
|
|
|
+
|
|
|
+
|
|
|
@router.get("/{printer_id}/camera/snapshot")
|
|
|
async def camera_snapshot(
|
|
|
printer_id: int,
|