|
|
@@ -163,6 +163,43 @@ async def generate_chamber_mjpeg_stream(
|
|
|
logger.info("Chamber image stream stopped for %s (stream_id=%s)", ip_address, stream_id)
|
|
|
|
|
|
|
|
|
+async def _terminate_ffmpeg(process: asyncio.subprocess.Process, stream_id: str | None = None) -> None:
|
|
|
+ """Terminate an ffmpeg process gracefully, then kill if needed."""
|
|
|
+ if process.returncode is not None:
|
|
|
+ return # Already dead
|
|
|
+ try:
|
|
|
+ process.terminate()
|
|
|
+ try:
|
|
|
+ await asyncio.wait_for(process.wait(), timeout=2.0)
|
|
|
+ except TimeoutError:
|
|
|
+ logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
|
|
|
+ process.kill()
|
|
|
+ await process.wait()
|
|
|
+ except ProcessLookupError:
|
|
|
+ pass # Already dead
|
|
|
+ except OSError as e:
|
|
|
+ logger.warning("Error terminating ffmpeg: %s", e)
|
|
|
+ _spawned_ffmpeg_pids.pop(process.pid, None)
|
|
|
+
|
|
|
+
|
|
|
+async def _read_ffmpeg_stderr(process: asyncio.subprocess.Process) -> str | None:
|
|
|
+ """Read ffmpeg stderr for diagnostics (best-effort, non-blocking)."""
|
|
|
+ if not process or not process.stderr:
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ data = await asyncio.wait_for(process.stderr.read(), timeout=2.0)
|
|
|
+ return data.decode(errors="replace") if data else None
|
|
|
+ except (TimeoutError, Exception):
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+# Max consecutive RTSP reconnections before giving up.
|
|
|
+# Some printer firmwares (notably P2S) drop RTSP sessions after a few seconds,
|
|
|
+# so we transparently respawn ffmpeg to keep the MJPEG stream alive.
|
|
|
+_RTSP_MAX_RECONNECTS = 30
|
|
|
+_RTSP_RECONNECT_DELAY = 1.0 # seconds between respawns
|
|
|
+
|
|
|
+
|
|
|
async def generate_rtsp_mjpeg_stream(
|
|
|
ip_address: str,
|
|
|
access_code: str,
|
|
|
@@ -175,6 +212,7 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
"""Generate MJPEG stream from printer camera using ffmpeg/RTSP.
|
|
|
|
|
|
This is for X1/H2/P2 models that support RTSP streaming.
|
|
|
+ Auto-reconnects when the printer drops the RTSP session (common on P2S).
|
|
|
"""
|
|
|
ffmpeg = get_ffmpeg_path()
|
|
|
if not ffmpeg:
|
|
|
@@ -188,7 +226,7 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
# ffmpeg command to output MJPEG stream to stdout
|
|
|
# -rtsp_transport tcp: Use TCP for reliability
|
|
|
# -rtsp_flags prefer_tcp: Prefer TCP for RTSP
|
|
|
- # -timeout: Connection timeout in microseconds (30 seconds)
|
|
|
+ # -timeout: Socket I/O timeout in microseconds (30 seconds)
|
|
|
# -buffer_size: Larger buffer for network jitter
|
|
|
# -max_delay: Maximum demuxing delay
|
|
|
# -f mjpeg: Output as MJPEG
|
|
|
@@ -223,124 +261,161 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
)
|
|
|
logger.debug("ffmpeg command: %s ... (url hidden)", ffmpeg)
|
|
|
|
|
|
+ # On Windows, spawn ffmpeg in its own process group so that
|
|
|
+ # terminate() doesn't broadcast CTRL_C_EVENT to uvicorn (#605).
|
|
|
+ spawn_kwargs: dict = {}
|
|
|
+ if sys.platform == "win32":
|
|
|
+ spawn_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
|
|
|
+
|
|
|
+ jpeg_start = b"\xff\xd8"
|
|
|
+ jpeg_end = b"\xff\xd9"
|
|
|
+ reconnect_count = 0
|
|
|
process = None
|
|
|
- try:
|
|
|
- # On Windows, spawn ffmpeg in its own process group so that
|
|
|
- # terminate() doesn't broadcast CTRL_C_EVENT to uvicorn (#605).
|
|
|
- kwargs: dict = {}
|
|
|
- if sys.platform == "win32":
|
|
|
- kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
|
|
|
-
|
|
|
- process = await asyncio.create_subprocess_exec(
|
|
|
- *cmd,
|
|
|
- stdout=asyncio.subprocess.PIPE,
|
|
|
- stderr=asyncio.subprocess.PIPE,
|
|
|
- **kwargs,
|
|
|
- )
|
|
|
+ got_any_frames = False
|
|
|
|
|
|
- # Track active process for cleanup
|
|
|
- if stream_id:
|
|
|
- _active_streams[stream_id] = process
|
|
|
- import time as _time
|
|
|
+ try:
|
|
|
+ while reconnect_count <= _RTSP_MAX_RECONNECTS:
|
|
|
+ # Check for client disconnect before (re)connecting
|
|
|
+ if disconnect_event and disconnect_event.is_set():
|
|
|
+ break
|
|
|
|
|
|
- _spawned_ffmpeg_pids[process.pid] = _time.time()
|
|
|
+ if reconnect_count > 0:
|
|
|
+ logger.info(
|
|
|
+ "RTSP reconnecting (%d/%d) for %s (stream_id=%s)",
|
|
|
+ reconnect_count,
|
|
|
+ _RTSP_MAX_RECONNECTS,
|
|
|
+ ip_address,
|
|
|
+ stream_id,
|
|
|
+ )
|
|
|
+ await asyncio.sleep(_RTSP_RECONNECT_DELAY)
|
|
|
+ if disconnect_event and disconnect_event.is_set():
|
|
|
+ break
|
|
|
|
|
|
- # Give ffmpeg a moment to start and check for immediate failures
|
|
|
- await asyncio.sleep(0.5)
|
|
|
- if process.returncode is not None:
|
|
|
- stderr = await process.stderr.read()
|
|
|
- logger.error("ffmpeg failed immediately: %s", stderr.decode())
|
|
|
- yield (
|
|
|
- b"--frame\r\n"
|
|
|
- b"Content-Type: text/plain\r\n\r\n"
|
|
|
- b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
|
|
|
+ # Spawn ffmpeg
|
|
|
+ process = await asyncio.create_subprocess_exec(
|
|
|
+ *cmd,
|
|
|
+ stdout=asyncio.subprocess.PIPE,
|
|
|
+ stderr=asyncio.subprocess.PIPE,
|
|
|
+ **spawn_kwargs,
|
|
|
)
|
|
|
- return
|
|
|
-
|
|
|
- # Read JPEG frames from ffmpeg output
|
|
|
- # JPEG images start with 0xFFD8 and end with 0xFFD9
|
|
|
- buffer = b""
|
|
|
- jpeg_start = b"\xff\xd8"
|
|
|
- jpeg_end = b"\xff\xd9"
|
|
|
|
|
|
- while True:
|
|
|
- # Check if client disconnected
|
|
|
- if disconnect_event and disconnect_event.is_set():
|
|
|
- logger.info("Client disconnected, stopping stream %s", stream_id)
|
|
|
- break
|
|
|
+ if stream_id:
|
|
|
+ _active_streams[stream_id] = process
|
|
|
+ import time as _time
|
|
|
+
|
|
|
+ _spawned_ffmpeg_pids[process.pid] = _time.time()
|
|
|
+
|
|
|
+ # Give ffmpeg a moment to start and check for immediate failures
|
|
|
+ await asyncio.sleep(0.5)
|
|
|
+ if process.returncode is not None:
|
|
|
+ stderr = await process.stderr.read()
|
|
|
+ stderr_text = stderr.decode(errors="replace")
|
|
|
+ logger.error("ffmpeg failed immediately (attempt %d): %s", reconnect_count + 1, stderr_text)
|
|
|
+ _spawned_ffmpeg_pids.pop(process.pid, None)
|
|
|
+ if not got_any_frames and reconnect_count == 0:
|
|
|
+ # First attempt failed immediately — camera is likely unreachable
|
|
|
+ yield (
|
|
|
+ b"--frame\r\n"
|
|
|
+ b"Content-Type: text/plain\r\n\r\n"
|
|
|
+ b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
|
|
|
+ )
|
|
|
+ return
|
|
|
+ reconnect_count += 1
|
|
|
+ continue
|
|
|
|
|
|
- try:
|
|
|
- # Read chunk from ffmpeg - use longer timeout for network hiccups
|
|
|
- chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
|
|
|
+ # Read JPEG frames from ffmpeg stdout
|
|
|
+ buffer = b""
|
|
|
+ stream_ended = False
|
|
|
+ client_gone = False
|
|
|
|
|
|
- if not chunk:
|
|
|
- logger.warning("Camera stream ended (no more data)")
|
|
|
- # Log ffmpeg stderr for diagnostics
|
|
|
- if process and process.stderr:
|
|
|
- try:
|
|
|
- stderr_data = await asyncio.wait_for(process.stderr.read(), timeout=2.0)
|
|
|
- if stderr_data:
|
|
|
- logger.warning("ffmpeg stderr: %s", stderr_data.decode(errors="replace"))
|
|
|
- except (TimeoutError, Exception):
|
|
|
- pass
|
|
|
+ while True:
|
|
|
+ if disconnect_event and disconnect_event.is_set():
|
|
|
+ client_gone = True
|
|
|
break
|
|
|
|
|
|
- buffer += chunk
|
|
|
-
|
|
|
- # Find complete JPEG frames in buffer
|
|
|
- while True:
|
|
|
- start_idx = buffer.find(jpeg_start)
|
|
|
- if start_idx == -1:
|
|
|
- # No start marker, clear buffer up to last 2 bytes
|
|
|
- buffer = buffer[-2:] if len(buffer) > 2 else buffer
|
|
|
+ try:
|
|
|
+ chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
|
|
|
+
|
|
|
+ if not chunk:
|
|
|
+ # ffmpeg exited — log stderr and break to reconnect
|
|
|
+ stderr_text = await _read_ffmpeg_stderr(process)
|
|
|
+ if stderr_text:
|
|
|
+ logger.warning("ffmpeg stderr (stream_id=%s): %s", stream_id, stderr_text)
|
|
|
+ logger.warning("RTSP stream ended for %s (stream_id=%s), will reconnect", ip_address, stream_id)
|
|
|
+ stream_ended = True
|
|
|
break
|
|
|
|
|
|
- # Trim anything before the start marker
|
|
|
- if start_idx > 0:
|
|
|
- buffer = buffer[start_idx:]
|
|
|
+ buffer += chunk
|
|
|
|
|
|
- end_idx = buffer.find(jpeg_end, 2) # Skip first 2 bytes
|
|
|
- if end_idx == -1:
|
|
|
- # No end marker yet, wait for more data
|
|
|
- break
|
|
|
+ # Extract complete JPEG frames from buffer
|
|
|
+ while True:
|
|
|
+ start_idx = buffer.find(jpeg_start)
|
|
|
+ if start_idx == -1:
|
|
|
+ buffer = buffer[-2:] if len(buffer) > 2 else buffer
|
|
|
+ break
|
|
|
|
|
|
- # Extract complete frame
|
|
|
- frame = buffer[: end_idx + 2]
|
|
|
- buffer = buffer[end_idx + 2 :]
|
|
|
+ if start_idx > 0:
|
|
|
+ buffer = buffer[start_idx:]
|
|
|
|
|
|
- # Save frame to buffer for photo capture and track timestamp
|
|
|
- if printer_id is not None:
|
|
|
- import time
|
|
|
+ end_idx = buffer.find(jpeg_end, 2)
|
|
|
+ if end_idx == -1:
|
|
|
+ break
|
|
|
|
|
|
- _last_frames[printer_id] = frame
|
|
|
- _last_frame_times[printer_id] = time.time()
|
|
|
+ frame = buffer[: end_idx + 2]
|
|
|
+ buffer = buffer[end_idx + 2 :]
|
|
|
+ got_any_frames = True
|
|
|
|
|
|
- # Yield frame in MJPEG format
|
|
|
- yield (
|
|
|
- b"--frame\r\n"
|
|
|
- b"Content-Type: image/jpeg\r\n"
|
|
|
- b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
|
|
|
- b"\r\n" + frame + b"\r\n"
|
|
|
- )
|
|
|
+ if printer_id is not None:
|
|
|
+ import time
|
|
|
|
|
|
- except TimeoutError:
|
|
|
- logger.warning("Camera stream read timeout")
|
|
|
- # Log ffmpeg stderr for diagnostics
|
|
|
- if process and process.stderr:
|
|
|
- try:
|
|
|
- stderr_data = await asyncio.wait_for(process.stderr.read(), timeout=2.0)
|
|
|
- if stderr_data:
|
|
|
- logger.warning("ffmpeg stderr on timeout: %s", stderr_data.decode(errors="replace"))
|
|
|
- except (TimeoutError, Exception):
|
|
|
- pass
|
|
|
- break
|
|
|
- except asyncio.CancelledError:
|
|
|
- logger.info("Camera stream cancelled (stream_id=%s)", stream_id)
|
|
|
- break
|
|
|
- except GeneratorExit:
|
|
|
- logger.info("Camera stream generator exit (stream_id=%s)", stream_id)
|
|
|
+ _last_frames[printer_id] = frame
|
|
|
+ _last_frame_times[printer_id] = time.time()
|
|
|
+
|
|
|
+ yield (
|
|
|
+ b"--frame\r\n"
|
|
|
+ b"Content-Type: image/jpeg\r\n"
|
|
|
+ b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
|
|
|
+ b"\r\n" + frame + b"\r\n"
|
|
|
+ )
|
|
|
+
|
|
|
+ except TimeoutError:
|
|
|
+ stderr_text = await _read_ffmpeg_stderr(process)
|
|
|
+ if stderr_text:
|
|
|
+ logger.warning("ffmpeg stderr on timeout: %s", stderr_text)
|
|
|
+ logger.warning("RTSP read timeout for %s (stream_id=%s)", ip_address, stream_id)
|
|
|
+ stream_ended = True
|
|
|
+ break
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ logger.info("Camera stream cancelled (stream_id=%s)", stream_id)
|
|
|
+ client_gone = True
|
|
|
+ break
|
|
|
+ except GeneratorExit:
|
|
|
+ logger.info("Camera stream generator exit (stream_id=%s)", stream_id)
|
|
|
+ client_gone = True
|
|
|
+ break
|
|
|
+
|
|
|
+ # Clean up this ffmpeg process before reconnecting or exiting
|
|
|
+ await _terminate_ffmpeg(process, stream_id)
|
|
|
+ process = None
|
|
|
+
|
|
|
+ if client_gone:
|
|
|
break
|
|
|
|
|
|
+ if stream_ended:
|
|
|
+ reconnect_count += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Normal exit (shouldn't reach here, but be safe)
|
|
|
+ break
|
|
|
+
|
|
|
+ if reconnect_count > _RTSP_MAX_RECONNECTS:
|
|
|
+ logger.error(
|
|
|
+ "RTSP max reconnects (%d) reached for %s (stream_id=%s)",
|
|
|
+ _RTSP_MAX_RECONNECTS,
|
|
|
+ ip_address,
|
|
|
+ stream_id,
|
|
|
+ )
|
|
|
+
|
|
|
except FileNotFoundError:
|
|
|
logger.error("ffmpeg not found - camera streaming requires ffmpeg")
|
|
|
yield (b"--frame\r\nContent-Type: text/plain\r\n\r\nError: ffmpeg not installed\r\n")
|
|
|
@@ -361,24 +436,9 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
_last_frame_times.pop(printer_id, None)
|
|
|
_stream_start_times.pop(printer_id, None)
|
|
|
|
|
|
- if process and process.returncode is None:
|
|
|
- logger.info("Terminating ffmpeg process for stream %s", stream_id)
|
|
|
- try:
|
|
|
- process.terminate()
|
|
|
- try:
|
|
|
- await asyncio.wait_for(process.wait(), timeout=2.0)
|
|
|
- except TimeoutError:
|
|
|
- logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
|
|
|
- process.kill()
|
|
|
- await process.wait()
|
|
|
- except ProcessLookupError:
|
|
|
- pass # Process already dead
|
|
|
- except OSError as e:
|
|
|
- logger.warning("Error terminating ffmpeg: %s", e)
|
|
|
- logger.info("Camera stream stopped for %s (stream_id=%s)", ip_address, stream_id)
|
|
|
- # Remove from PID tracking now that process is confirmed dead
|
|
|
if process:
|
|
|
- _spawned_ffmpeg_pids.pop(process.pid, None)
|
|
|
+ await _terminate_ffmpeg(process, stream_id)
|
|
|
+ logger.info("Camera stream stopped for %s (stream_id=%s)", ip_address, stream_id)
|
|
|
|
|
|
|
|
|
@router.get("/{printer_id}/camera/stream")
|