|
|
@@ -45,6 +45,10 @@ _stream_start_times: dict[int, float] = {}
|
|
|
# Track active external camera streams by printer ID
|
|
|
_active_external_streams: set[int] = set()
|
|
|
|
|
|
+# Track ALL spawned ffmpeg PIDs (persists even if _active_streams entries are removed)
|
|
|
+# Maps PID -> spawn timestamp — used by cleanup to find truly orphaned OS processes
|
|
|
+_spawned_ffmpeg_pids: dict[int, float] = {}
|
|
|
+
|
|
|
|
|
|
def get_buffered_frame(printer_id: int) -> bytes | None:
|
|
|
"""Get the last buffered frame for a printer from an active stream.
|
|
|
@@ -228,6 +232,9 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
# Track active process for cleanup
|
|
|
if stream_id:
|
|
|
_active_streams[stream_id] = process
|
|
|
+ import time as _time
|
|
|
+
|
|
|
+ _spawned_ffmpeg_pids[process.pid] = _time.time()
|
|
|
|
|
|
# Give ffmpeg a moment to start and check for immediate failures
|
|
|
await asyncio.sleep(0.5)
|
|
|
@@ -344,6 +351,9 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
except OSError as e:
|
|
|
logger.warning("Error terminating ffmpeg: %s", e)
|
|
|
logger.info("Camera stream stopped for %s (stream_id=%s)", ip_address, stream_id)
|
|
|
+ # Remove from PID tracking now that process is confirmed dead
|
|
|
+ if process:
|
|
|
+ _spawned_ffmpeg_pids.pop(process.pid, None)
|
|
|
|
|
|
|
|
|
@router.get("/{printer_id}/camera/stream")
|
|
|
@@ -444,6 +454,43 @@ async def camera_stream(
|
|
|
|
|
|
_stream_start_times[printer_id] = time.time()
|
|
|
|
|
|
+ async def _kill_stream_process(sid: str):
|
|
|
+ """Terminate+kill the ffmpeg process for a stream ID."""
|
|
|
+ proc = _active_streams.get(sid)
|
|
|
+ if proc and proc.returncode is None:
|
|
|
+ try:
|
|
|
+ proc.terminate()
|
|
|
+ try:
|
|
|
+ await asyncio.wait_for(proc.wait(), timeout=2.0)
|
|
|
+ except TimeoutError:
|
|
|
+ proc.kill()
|
|
|
+ await proc.wait()
|
|
|
+ except (ProcessLookupError, OSError):
|
|
|
+ pass
|
|
|
+
|
|
|
+ async def _monitor_disconnect():
|
|
|
+ """Background task: poll for client disconnect independently of frame loop."""
|
|
|
+ try:
|
|
|
+ while not disconnect_event.is_set():
|
|
|
+ await asyncio.sleep(2)
|
|
|
+ if await request.is_disconnected():
|
|
|
+ logger.info("Disconnect monitor: client gone (stream %s)", stream_id)
|
|
|
+ disconnect_event.set()
|
|
|
+ # Kill ffmpeg process (RTSP streams)
|
|
|
+ await _kill_stream_process(stream_id)
|
|
|
+ # Close chamber stream connection if applicable
|
|
|
+ chamber = _active_chamber_streams.get(stream_id)
|
|
|
+ if chamber:
|
|
|
+ try:
|
|
|
+ chamber[1].close()
|
|
|
+ except OSError:
|
|
|
+ pass
|
|
|
+ break
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ monitor_task = asyncio.create_task(_monitor_disconnect())
|
|
|
+
|
|
|
async def stream_with_disconnect_check():
|
|
|
"""Wrapper generator that monitors for client disconnect."""
|
|
|
try:
|
|
|
@@ -457,7 +504,7 @@ async def camera_stream(
|
|
|
printer_id=printer_id,
|
|
|
):
|
|
|
# Check if client is still connected
|
|
|
- if await request.is_disconnected():
|
|
|
+ if disconnect_event.is_set() or await request.is_disconnected():
|
|
|
logger.info("Client disconnected detected for stream %s", stream_id)
|
|
|
disconnect_event.set()
|
|
|
break
|
|
|
@@ -470,6 +517,7 @@ async def camera_stream(
|
|
|
disconnect_event.set()
|
|
|
finally:
|
|
|
disconnect_event.set()
|
|
|
+ monitor_task.cancel()
|
|
|
# Give a moment for the inner generator to clean up
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
@@ -504,10 +552,19 @@ async def stop_camera_stream(
|
|
|
if process.returncode is None:
|
|
|
try:
|
|
|
process.terminate()
|
|
|
+ try:
|
|
|
+ await asyncio.wait_for(process.wait(), timeout=2.0)
|
|
|
+ except TimeoutError:
|
|
|
+ logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
|
|
|
+ process.kill()
|
|
|
+ await process.wait()
|
|
|
stopped += 1
|
|
|
logger.info("Terminated ffmpeg process for stream %s", stream_id)
|
|
|
+ except ProcessLookupError:
|
|
|
+ pass # Process already dead
|
|
|
except OSError as e:
|
|
|
logger.warning("Error stopping stream %s: %s", stream_id, e)
|
|
|
+ _spawned_ffmpeg_pids.pop(process.pid, None)
|
|
|
|
|
|
for stream_id in to_remove:
|
|
|
_active_streams.pop(stream_id, None)
|
|
|
@@ -1086,3 +1143,109 @@ async def delete_reference(
|
|
|
raise HTTPException(404, "Reference not found")
|
|
|
|
|
|
return {"success": True, "message": "Reference deleted"}
|
|
|
+
|
|
|
+
|
|
|
+def _scan_bambu_ffmpeg_pids() -> list[int]:
|
|
|
+ """Scan /proc for ffmpeg processes with Bambu RTSP URLs.
|
|
|
+
|
|
|
+ These are definitely ours — no other software connects to rtsps://bblp:.
|
|
|
+ This catches orphans that survive app restarts and are not in any tracking dict.
|
|
|
+ """
|
|
|
+ import os
|
|
|
+
|
|
|
+ pids = []
|
|
|
+ try:
|
|
|
+ for entry in os.listdir("/proc"):
|
|
|
+ if not entry.isdigit():
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ with open(f"/proc/{entry}/cmdline", "rb") as f:
|
|
|
+ cmdline = f.read()
|
|
|
+ if b"ffmpeg" in cmdline and b"rtsps://bblp:" in cmdline:
|
|
|
+ pids.append(int(entry))
|
|
|
+ except (OSError, PermissionError, ValueError):
|
|
|
+ continue
|
|
|
+ except OSError:
|
|
|
+ pass
|
|
|
+ return pids
|
|
|
+
|
|
|
+
|
|
|
+async def cleanup_orphaned_streams():
|
|
|
+ """Clean up orphaned ffmpeg processes and stale stream entries.
|
|
|
+
|
|
|
+ Called periodically from the background task loop in main.py.
|
|
|
+
|
|
|
+ Three-layer cleanup:
|
|
|
+ 1. /proc scan — finds ALL Bambu ffmpeg processes on the system, even those
|
|
|
+ from previous app sessions. This is the nuclear safety net.
|
|
|
+ 2. _spawned_ffmpeg_pids — tracks PIDs spawned this session, catches orphans
|
|
|
+ that were removed from _active_streams but not killed.
|
|
|
+ 3. _active_streams — kills stale entries with no recent frames.
|
|
|
+ """
|
|
|
+ import os
|
|
|
+ import signal
|
|
|
+ import time
|
|
|
+
|
|
|
+ cleaned = 0
|
|
|
+ now = time.time()
|
|
|
+
|
|
|
+ # Collect PIDs that are legitimately in-use (active stream, process alive)
|
|
|
+ active_pids = {proc.pid for proc in _active_streams.values() if proc.returncode is None}
|
|
|
+
|
|
|
+ # 1. /proc scan — catch ALL orphaned Bambu ffmpeg processes on the system.
|
|
|
+ # Any ffmpeg with rtsps://bblp: that is NOT in an active stream is orphaned.
|
|
|
+ for pid in _scan_bambu_ffmpeg_pids():
|
|
|
+ if pid in active_pids:
|
|
|
+ continue
|
|
|
+ logger.info("Killing orphaned ffmpeg process found via /proc (pid=%d)", pid)
|
|
|
+ try:
|
|
|
+ os.kill(pid, signal.SIGKILL)
|
|
|
+ except (ProcessLookupError, OSError):
|
|
|
+ pass
|
|
|
+ _spawned_ffmpeg_pids.pop(pid, None)
|
|
|
+ cleaned += 1
|
|
|
+
|
|
|
+ # 2. Clean up _spawned_ffmpeg_pids entries for dead processes
|
|
|
+ for pid in list(_spawned_ffmpeg_pids):
|
|
|
+ try:
|
|
|
+ os.kill(pid, 0) # existence check
|
|
|
+ except (ProcessLookupError, OSError):
|
|
|
+ _spawned_ffmpeg_pids.pop(pid, None)
|
|
|
+
|
|
|
+ # 3. Clean up _active_streams entries with dead processes
|
|
|
+ dead_streams = [sid for sid, proc in _active_streams.items() if proc.returncode is not None]
|
|
|
+ for sid in dead_streams:
|
|
|
+ proc = _active_streams.pop(sid, None)
|
|
|
+ if proc:
|
|
|
+ _spawned_ffmpeg_pids.pop(proc.pid, None)
|
|
|
+ cleaned += 1
|
|
|
+
|
|
|
+ # 4. Kill stale active streams (alive but no frames for >60s)
|
|
|
+ for sid, proc in list(_active_streams.items()):
|
|
|
+ if proc.returncode is not None:
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ printer_id = int(sid.split("-", 1)[0])
|
|
|
+ except (ValueError, IndexError):
|
|
|
+ continue
|
|
|
+ start_time = _stream_start_times.get(printer_id, now)
|
|
|
+ last_frame = _last_frame_times.get(printer_id, start_time)
|
|
|
+ if now - start_time > 120 and now - last_frame > 60:
|
|
|
+ logger.info("Killing stale ffmpeg stream %s (no frames for %.0fs)", sid, now - last_frame)
|
|
|
+ try:
|
|
|
+ proc.kill()
|
|
|
+ await proc.wait()
|
|
|
+ except (ProcessLookupError, OSError):
|
|
|
+ pass
|
|
|
+ _active_streams.pop(sid, None)
|
|
|
+ _spawned_ffmpeg_pids.pop(proc.pid, None)
|
|
|
+ cleaned += 1
|
|
|
+
|
|
|
+ # 4. Clean stale chamber stream entries
|
|
|
+ dead_chamber = [sid for sid, (_reader, writer) in _active_chamber_streams.items() if writer.is_closing()]
|
|
|
+ for sid in dead_chamber:
|
|
|
+ _active_chamber_streams.pop(sid, None)
|
|
|
+ cleaned += 1
|
|
|
+
|
|
|
+ if cleaned:
|
|
|
+ logger.info("Cleaned up %d orphaned camera stream(s)", cleaned)
|