|
@@ -52,6 +52,13 @@ _active_external_streams: set[int] = set()
|
|
|
# Maps PID -> spawn timestamp — used by cleanup to find truly orphaned OS processes
|
|
# Maps PID -> spawn timestamp — used by cleanup to find truly orphaned OS processes
|
|
|
_spawned_ffmpeg_pids: dict[int, float] = {}
|
|
_spawned_ffmpeg_pids: dict[int, float] = {}
|
|
|
|
|
|
|
|
|
|
+# Track disconnect events per stream_id — allows stop endpoint and cleanup
|
|
|
|
|
+# to signal generators to stop reconnecting instead of just killing the process
|
|
|
|
|
+_disconnect_events: dict[str, asyncio.Event] = {}
|
|
|
|
|
+
|
|
|
|
|
+# Track last frame time per stream_id (not just per printer_id) for stale detection
|
|
|
|
|
+_stream_last_frame_times: dict[str, float] = {}
|
|
|
|
|
+
|
|
|
|
|
|
|
|
def get_buffered_frame(printer_id: int) -> bytes | None:
|
|
def get_buffered_frame(printer_id: int) -> bytes | None:
|
|
|
"""Get the last buffered frame for a printer from an active stream.
|
|
"""Get the last buffered frame for a printer from an active stream.
|
|
@@ -85,6 +92,10 @@ async def generate_chamber_mjpeg_stream(
|
|
|
"""
|
|
"""
|
|
|
logger.info("Starting chamber image stream for %s (stream_id=%s, model=%s)", ip_address, stream_id, model)
|
|
logger.info("Starting chamber image stream for %s (stream_id=%s, model=%s)", ip_address, stream_id, model)
|
|
|
|
|
|
|
|
|
|
+ # Register disconnect event so stop endpoint can signal us
|
|
|
|
|
+ if stream_id and disconnect_event:
|
|
|
|
|
+ _disconnect_events[stream_id] = disconnect_event
|
|
|
|
|
+
|
|
|
connection = await generate_chamber_image_stream(ip_address, access_code, fps)
|
|
connection = await generate_chamber_image_stream(ip_address, access_code, fps)
|
|
|
if connection is None:
|
|
if connection is None:
|
|
|
logger.error("Failed to connect to chamber image stream for %s", ip_address)
|
|
logger.error("Failed to connect to chamber image stream for %s", ip_address)
|
|
@@ -145,9 +156,11 @@ async def generate_chamber_mjpeg_stream(
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.exception("Chamber image stream error: %s", e)
|
|
logger.exception("Chamber image stream error: %s", e)
|
|
|
finally:
|
|
finally:
|
|
|
- # Remove from active streams
|
|
|
|
|
- if stream_id and stream_id in _active_chamber_streams:
|
|
|
|
|
- del _active_chamber_streams[stream_id]
|
|
|
|
|
|
|
+ # Remove from active streams and disconnect events
|
|
|
|
|
+ if stream_id:
|
|
|
|
|
+ _active_chamber_streams.pop(stream_id, None)
|
|
|
|
|
+ _disconnect_events.pop(stream_id, None)
|
|
|
|
|
+ _stream_last_frame_times.pop(stream_id, None)
|
|
|
|
|
|
|
|
# Clean up frame buffer and timestamps
|
|
# Clean up frame buffer and timestamps
|
|
|
if printer_id is not None:
|
|
if printer_id is not None:
|
|
@@ -263,6 +276,10 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
"-", # Output to stdout
|
|
"-", # Output to stdout
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
|
|
+ # Register disconnect event so stop endpoint can signal us
|
|
|
|
|
+ if stream_id and disconnect_event:
|
|
|
|
|
+ _disconnect_events[stream_id] = disconnect_event
|
|
|
|
|
+
|
|
|
logger.info(
|
|
logger.info(
|
|
|
"Starting RTSP camera stream for %s (stream_id=%s, model=%s, fps=%s)", ip_address, stream_id, model, fps
|
|
"Starting RTSP camera stream for %s (stream_id=%s, model=%s, fps=%s)", ip_address, stream_id, model, fps
|
|
|
)
|
|
)
|
|
@@ -377,6 +394,8 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
|
|
|
|
|
_last_frames[printer_id] = frame
|
|
_last_frames[printer_id] = frame
|
|
|
_last_frame_times[printer_id] = time.time()
|
|
_last_frame_times[printer_id] = time.time()
|
|
|
|
|
+ if stream_id:
|
|
|
|
|
+ _stream_last_frame_times[stream_id] = time.time()
|
|
|
|
|
|
|
|
yield (
|
|
yield (
|
|
|
b"--frame\r\n"
|
|
b"--frame\r\n"
|
|
@@ -408,6 +427,11 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
if client_gone:
|
|
if client_gone:
|
|
|
break
|
|
break
|
|
|
|
|
|
|
|
|
|
+ # Check if stream was explicitly stopped (e.g., by stop endpoint)
|
|
|
|
|
+ if stream_id and stream_id not in _active_streams:
|
|
|
|
|
+ logger.info("Stream %s removed from active streams, stopping reconnect", stream_id)
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
if stream_ended:
|
|
if stream_ended:
|
|
|
reconnect_count += 1
|
|
reconnect_count += 1
|
|
|
continue
|
|
continue
|
|
@@ -433,9 +457,11 @@ async def generate_rtsp_mjpeg_stream(
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.exception("Camera stream error: %s", e)
|
|
logger.exception("Camera stream error: %s", e)
|
|
|
finally:
|
|
finally:
|
|
|
- # Remove from active streams
|
|
|
|
|
- if stream_id and stream_id in _active_streams:
|
|
|
|
|
- del _active_streams[stream_id]
|
|
|
|
|
|
|
+ # Remove from active streams and disconnect events
|
|
|
|
|
+ if stream_id:
|
|
|
|
|
+ _active_streams.pop(stream_id, None)
|
|
|
|
|
+ _disconnect_events.pop(stream_id, None)
|
|
|
|
|
+ _stream_last_frame_times.pop(stream_id, None)
|
|
|
|
|
|
|
|
# Clean up frame buffer and timestamps
|
|
# Clean up frame buffer and timestamps
|
|
|
if printer_id is not None:
|
|
if printer_id is not None:
|
|
@@ -639,6 +665,10 @@ async def stop_camera_stream(
|
|
|
for stream_id, process in list(_active_streams.items()):
|
|
for stream_id, process in list(_active_streams.items()):
|
|
|
if stream_id.startswith(f"{printer_id}-"):
|
|
if stream_id.startswith(f"{printer_id}-"):
|
|
|
to_remove.append(stream_id)
|
|
to_remove.append(stream_id)
|
|
|
|
|
+ # Signal the generator to stop reconnecting BEFORE killing the process
|
|
|
|
|
+ event = _disconnect_events.get(stream_id)
|
|
|
|
|
+ if event:
|
|
|
|
|
+ event.set()
|
|
|
if process.returncode is None:
|
|
if process.returncode is None:
|
|
|
try:
|
|
try:
|
|
|
process.terminate()
|
|
process.terminate()
|
|
@@ -658,12 +688,18 @@ async def stop_camera_stream(
|
|
|
|
|
|
|
|
for stream_id in to_remove:
|
|
for stream_id in to_remove:
|
|
|
_active_streams.pop(stream_id, None)
|
|
_active_streams.pop(stream_id, None)
|
|
|
|
|
+ _disconnect_events.pop(stream_id, None)
|
|
|
|
|
+ _stream_last_frame_times.pop(stream_id, None)
|
|
|
|
|
|
|
|
# Stop chamber image streams
|
|
# Stop chamber image streams
|
|
|
to_remove_chamber = []
|
|
to_remove_chamber = []
|
|
|
for stream_id, (_reader, writer) in list(_active_chamber_streams.items()):
|
|
for stream_id, (_reader, writer) in list(_active_chamber_streams.items()):
|
|
|
if stream_id.startswith(f"{printer_id}-"):
|
|
if stream_id.startswith(f"{printer_id}-"):
|
|
|
to_remove_chamber.append(stream_id)
|
|
to_remove_chamber.append(stream_id)
|
|
|
|
|
+ # Signal the generator to stop
|
|
|
|
|
+ event = _disconnect_events.get(stream_id)
|
|
|
|
|
+ if event:
|
|
|
|
|
+ event.set()
|
|
|
try:
|
|
try:
|
|
|
writer.close()
|
|
writer.close()
|
|
|
stopped += 1
|
|
stopped += 1
|
|
@@ -673,6 +709,8 @@ async def stop_camera_stream(
|
|
|
|
|
|
|
|
for stream_id in to_remove_chamber:
|
|
for stream_id in to_remove_chamber:
|
|
|
_active_chamber_streams.pop(stream_id, None)
|
|
_active_chamber_streams.pop(stream_id, None)
|
|
|
|
|
+ _disconnect_events.pop(stream_id, None)
|
|
|
|
|
+ _stream_last_frame_times.pop(stream_id, None)
|
|
|
|
|
|
|
|
logger.info("Stopped %s camera stream(s) for printer %s", stopped, printer_id)
|
|
logger.info("Stopped %s camera stream(s) for printer %s", stopped, printer_id)
|
|
|
return {"stopped": stopped}
|
|
return {"stopped": stopped}
|
|
@@ -1311,24 +1349,36 @@ async def cleanup_orphaned_streams():
|
|
|
_spawned_ffmpeg_pids.pop(proc.pid, None)
|
|
_spawned_ffmpeg_pids.pop(proc.pid, None)
|
|
|
cleaned += 1
|
|
cleaned += 1
|
|
|
|
|
|
|
|
- # 4. Kill stale active streams (alive but no frames for >60s)
|
|
|
|
|
|
|
+ # 4. Kill stale active streams (alive but no frames for >30s)
|
|
|
|
|
+ # Uses per-stream timestamps to avoid false "fresh" readings from newer streams
|
|
|
for sid, proc in list(_active_streams.items()):
|
|
for sid, proc in list(_active_streams.items()):
|
|
|
if proc.returncode is not None:
|
|
if proc.returncode is not None:
|
|
|
continue
|
|
continue
|
|
|
- try:
|
|
|
|
|
- printer_id = int(sid.split("-", 1)[0])
|
|
|
|
|
- except (ValueError, IndexError):
|
|
|
|
|
- continue
|
|
|
|
|
- start_time = _stream_start_times.get(printer_id, now)
|
|
|
|
|
- last_frame = _last_frame_times.get(printer_id, start_time)
|
|
|
|
|
- if now - start_time > 120 and now - last_frame > 60:
|
|
|
|
|
- logger.info("Killing stale ffmpeg stream %s (no frames for %.0fs)", sid, now - last_frame)
|
|
|
|
|
|
|
+ # Per-stream frame time is authoritative; fall back to per-printer
|
|
|
|
|
+ stream_last_frame = _stream_last_frame_times.get(sid)
|
|
|
|
|
+ if stream_last_frame is None:
|
|
|
|
|
+ try:
|
|
|
|
|
+ printer_id = int(sid.split("-", 1)[0])
|
|
|
|
|
+ except (ValueError, IndexError):
|
|
|
|
|
+ continue
|
|
|
|
|
+ stream_last_frame = _last_frame_times.get(printer_id)
|
|
|
|
|
+ spawn_time = _spawned_ffmpeg_pids.get(proc.pid, now)
|
|
|
|
|
+ if stream_last_frame is None:
|
|
|
|
|
+ stream_last_frame = spawn_time
|
|
|
|
|
+ if now - spawn_time > 60 and now - stream_last_frame > 30:
|
|
|
|
|
+ logger.info("Killing stale ffmpeg stream %s (no frames for %.0fs)", sid, now - stream_last_frame)
|
|
|
|
|
+ # Signal the generator to stop reconnecting
|
|
|
|
|
+ event = _disconnect_events.get(sid)
|
|
|
|
|
+ if event:
|
|
|
|
|
+ event.set()
|
|
|
try:
|
|
try:
|
|
|
proc.kill()
|
|
proc.kill()
|
|
|
await proc.wait()
|
|
await proc.wait()
|
|
|
except (ProcessLookupError, OSError):
|
|
except (ProcessLookupError, OSError):
|
|
|
pass
|
|
pass
|
|
|
_active_streams.pop(sid, None)
|
|
_active_streams.pop(sid, None)
|
|
|
|
|
+ _disconnect_events.pop(sid, None)
|
|
|
|
|
+ _stream_last_frame_times.pop(sid, None)
|
|
|
_spawned_ffmpeg_pids.pop(proc.pid, None)
|
|
_spawned_ffmpeg_pids.pop(proc.pid, None)
|
|
|
cleaned += 1
|
|
cleaned += 1
|
|
|
|
|
|