camera.py 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345
  1. """Camera streaming API endpoints for Bambu Lab printers."""
  2. import asyncio
  3. import logging
  4. import subprocess
  5. import sys
  6. from collections.abc import AsyncGenerator
  7. from fastapi import APIRouter, Depends, HTTPException, Request
  8. from fastapi.responses import Response, StreamingResponse
  9. from sqlalchemy import select
  10. from sqlalchemy.ext.asyncio import AsyncSession
  11. from backend.app.core.auth import RequirePermissionIfAuthEnabled
  12. from backend.app.core.database import get_db
  13. from backend.app.core.permissions import Permission
  14. from backend.app.models.printer import Printer
  15. from backend.app.models.user import User
  16. from backend.app.services.camera import (
  17. capture_camera_frame,
  18. generate_chamber_image_stream,
  19. get_camera_port,
  20. get_ffmpeg_path,
  21. is_chamber_image_model,
  22. read_next_chamber_frame,
  23. test_camera_connection,
  24. )
  25. logger = logging.getLogger(__name__)
  26. router = APIRouter(prefix="/printers", tags=["camera"])
  27. # Track active ffmpeg processes for cleanup
  28. _active_streams: dict[str, asyncio.subprocess.Process] = {}
  29. # Track active chamber image connections for cleanup
  30. _active_chamber_streams: dict[str, tuple] = {}
  31. # Store last frame for each printer (for photo capture from active stream)
  32. _last_frames: dict[int, bytes] = {}
  33. # Track last frame timestamp for each printer (for stall detection)
  34. _last_frame_times: dict[int, float] = {}
  35. # Track stream start times for each printer
  36. _stream_start_times: dict[int, float] = {}
  37. # Track active external camera streams by printer ID
  38. _active_external_streams: set[int] = set()
  39. # Track ALL spawned ffmpeg PIDs (persists even if _active_streams entries are removed)
  40. # Maps PID -> spawn timestamp — used by cleanup to find truly orphaned OS processes
  41. _spawned_ffmpeg_pids: dict[int, float] = {}
  42. def get_buffered_frame(printer_id: int) -> bytes | None:
  43. """Get the last buffered frame for a printer from an active stream.
  44. Returns the JPEG frame data if available, or None if no active stream.
  45. """
  46. return _last_frames.get(printer_id)
  47. async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
  48. """Get printer by ID or raise 404."""
  49. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  50. printer = result.scalar_one_or_none()
  51. if not printer:
  52. raise HTTPException(status_code=404, detail="Printer not found")
  53. return printer
  54. async def generate_chamber_mjpeg_stream(
  55. ip_address: str,
  56. access_code: str,
  57. model: str | None,
  58. fps: int = 5,
  59. stream_id: str | None = None,
  60. disconnect_event: asyncio.Event | None = None,
  61. printer_id: int | None = None,
  62. ) -> AsyncGenerator[bytes, None]:
  63. """Generate MJPEG stream from A1/P1 printer using chamber image protocol.
  64. This connects to port 6000 and reads JPEG frames using the Bambu binary protocol.
  65. """
  66. logger.info("Starting chamber image stream for %s (stream_id=%s, model=%s)", ip_address, stream_id, model)
  67. connection = await generate_chamber_image_stream(ip_address, access_code, fps)
  68. if connection is None:
  69. logger.error("Failed to connect to chamber image stream for %s", ip_address)
  70. yield (
  71. b"--frame\r\n"
  72. b"Content-Type: text/plain\r\n\r\n"
  73. b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
  74. )
  75. return
  76. reader, writer = connection
  77. # Track active connection for cleanup
  78. if stream_id:
  79. _active_chamber_streams[stream_id] = (reader, writer)
  80. try:
  81. frame_interval = 1.0 / fps if fps > 0 else 0.2
  82. last_frame_time = 0.0
  83. while True:
  84. # Check if client disconnected
  85. if disconnect_event and disconnect_event.is_set():
  86. logger.info("Client disconnected, stopping chamber stream %s", stream_id)
  87. break
  88. # Read next frame
  89. frame = await read_next_chamber_frame(reader, timeout=30.0)
  90. if frame is None:
  91. logger.warning("Chamber image stream ended for %s", stream_id)
  92. break
  93. # Save frame to buffer for photo capture and track timestamp
  94. if printer_id is not None:
  95. import time
  96. _last_frames[printer_id] = frame
  97. _last_frame_times[printer_id] = time.time()
  98. # Rate limiting - skip frames if needed to maintain target FPS
  99. current_time = asyncio.get_event_loop().time()
  100. if current_time - last_frame_time < frame_interval:
  101. continue
  102. last_frame_time = current_time
  103. # Yield frame in MJPEG format
  104. yield (
  105. b"--frame\r\n"
  106. b"Content-Type: image/jpeg\r\n"
  107. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  108. b"\r\n" + frame + b"\r\n"
  109. )
  110. except asyncio.CancelledError:
  111. logger.info("Chamber image stream cancelled (stream_id=%s)", stream_id)
  112. except GeneratorExit:
  113. logger.info("Chamber image stream generator exit (stream_id=%s)", stream_id)
  114. except Exception as e:
  115. logger.exception("Chamber image stream error: %s", e)
  116. finally:
  117. # Remove from active streams
  118. if stream_id and stream_id in _active_chamber_streams:
  119. del _active_chamber_streams[stream_id]
  120. # Clean up frame buffer and timestamps
  121. if printer_id is not None:
  122. _last_frames.pop(printer_id, None)
  123. _last_frame_times.pop(printer_id, None)
  124. _stream_start_times.pop(printer_id, None)
  125. # Close the connection
  126. try:
  127. writer.close()
  128. await writer.wait_closed()
  129. except OSError:
  130. pass # Connection already closed or broken; cleanup is best-effort
  131. logger.info("Chamber image stream stopped for %s (stream_id=%s)", ip_address, stream_id)
  132. async def _terminate_ffmpeg(process: asyncio.subprocess.Process, stream_id: str | None = None) -> None:
  133. """Terminate an ffmpeg process gracefully, then kill if needed."""
  134. if process.returncode is not None:
  135. return # Already dead
  136. try:
  137. process.terminate()
  138. try:
  139. await asyncio.wait_for(process.wait(), timeout=2.0)
  140. except TimeoutError:
  141. logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
  142. process.kill()
  143. await process.wait()
  144. except ProcessLookupError:
  145. pass # Already dead
  146. except OSError as e:
  147. logger.warning("Error terminating ffmpeg: %s", e)
  148. _spawned_ffmpeg_pids.pop(process.pid, None)
  149. async def _read_ffmpeg_stderr(process: asyncio.subprocess.Process) -> str | None:
  150. """Read ffmpeg stderr for diagnostics (best-effort, non-blocking)."""
  151. if not process or not process.stderr:
  152. return None
  153. try:
  154. data = await asyncio.wait_for(process.stderr.read(), timeout=2.0)
  155. return data.decode(errors="replace") if data else None
  156. except (TimeoutError, Exception):
  157. return None
  158. # Max consecutive RTSP reconnections before giving up.
  159. # Some printer firmwares (notably P2S) drop RTSP sessions after a few seconds,
  160. # so we transparently respawn ffmpeg to keep the MJPEG stream alive.
  161. _RTSP_MAX_RECONNECTS = 30
  162. _RTSP_RECONNECT_DELAY = 0.2 # seconds between respawns
  163. async def generate_rtsp_mjpeg_stream(
  164. ip_address: str,
  165. access_code: str,
  166. model: str | None,
  167. fps: int = 10,
  168. stream_id: str | None = None,
  169. disconnect_event: asyncio.Event | None = None,
  170. printer_id: int | None = None,
  171. ) -> AsyncGenerator[bytes, None]:
  172. """Generate MJPEG stream from printer camera using ffmpeg/RTSP.
  173. This is for X1/H2/P2 models that support RTSP streaming.
  174. Auto-reconnects when the printer drops the RTSP session (common on P2S).
  175. """
  176. ffmpeg = get_ffmpeg_path()
  177. if not ffmpeg:
  178. logger.error("ffmpeg not found - camera streaming requires ffmpeg")
  179. yield (b"--frame\r\nContent-Type: text/plain\r\n\r\nError: ffmpeg not installed\r\n")
  180. return
  181. port = get_camera_port(model)
  182. camera_url = f"rtsps://bblp:{access_code}@{ip_address}:{port}/streaming/live/1"
  183. # ffmpeg command to output MJPEG stream to stdout
  184. # -rtsp_transport tcp: Use TCP for reliability
  185. # -rtsp_flags prefer_tcp: Prefer TCP for RTSP
  186. # -timeout: Socket I/O timeout in microseconds (30 seconds)
  187. # -buffer_size: Larger buffer for network jitter
  188. # -max_delay: Maximum demuxing delay
  189. # -f mjpeg: Output as MJPEG
  190. # -q:v 5: Quality (lower = better, 2-10 is good range)
  191. # -r: Output framerate
  192. cmd = [
  193. ffmpeg,
  194. "-rtsp_transport",
  195. "tcp",
  196. "-rtsp_flags",
  197. "prefer_tcp",
  198. "-timeout",
  199. "30000000", # 30 seconds in microseconds
  200. "-buffer_size",
  201. "1024000", # 1MB buffer
  202. "-max_delay",
  203. "500000", # 0.5 seconds max delay
  204. "-probesize",
  205. "32", # Minimal probing for fast start
  206. "-analyzeduration",
  207. "0", # Skip format analysis
  208. "-fflags",
  209. "nobuffer", # Reduce internal buffering
  210. "-flags",
  211. "low_delay", # Minimize decode latency
  212. "-i",
  213. camera_url,
  214. "-f",
  215. "mjpeg",
  216. "-q:v",
  217. "5",
  218. "-r",
  219. str(fps),
  220. "-an", # No audio
  221. "-", # Output to stdout
  222. ]
  223. logger.info(
  224. "Starting RTSP camera stream for %s (stream_id=%s, model=%s, fps=%s)", ip_address, stream_id, model, fps
  225. )
  226. logger.debug("ffmpeg command: %s ... (url hidden)", ffmpeg)
  227. # On Windows, spawn ffmpeg in its own process group so that
  228. # terminate() doesn't broadcast CTRL_C_EVENT to uvicorn (#605).
  229. spawn_kwargs: dict = {}
  230. if sys.platform == "win32":
  231. spawn_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
  232. jpeg_start = b"\xff\xd8"
  233. jpeg_end = b"\xff\xd9"
  234. reconnect_count = 0
  235. process = None
  236. got_any_frames = False
  237. try:
  238. while reconnect_count <= _RTSP_MAX_RECONNECTS:
  239. # Check for client disconnect before (re)connecting
  240. if disconnect_event and disconnect_event.is_set():
  241. break
  242. if reconnect_count > 0:
  243. logger.info(
  244. "RTSP reconnecting (%d/%d) for %s (stream_id=%s)",
  245. reconnect_count,
  246. _RTSP_MAX_RECONNECTS,
  247. ip_address,
  248. stream_id,
  249. )
  250. await asyncio.sleep(_RTSP_RECONNECT_DELAY)
  251. if disconnect_event and disconnect_event.is_set():
  252. break
  253. # Spawn ffmpeg — enable GnuTLS debug output in debug mode
  254. env = None
  255. if logger.isEnabledFor(logging.DEBUG):
  256. import os
  257. env = {**os.environ, "GNUTLS_DEBUG_LEVEL": "2"}
  258. process = await asyncio.create_subprocess_exec(
  259. *cmd,
  260. stdout=asyncio.subprocess.PIPE,
  261. stderr=asyncio.subprocess.PIPE,
  262. env=env,
  263. **spawn_kwargs,
  264. )
  265. if stream_id:
  266. _active_streams[stream_id] = process
  267. import time as _time
  268. _spawned_ffmpeg_pids[process.pid] = _time.time()
  269. # Give ffmpeg a moment to start and check for immediate failures
  270. await asyncio.sleep(0.1)
  271. if process.returncode is not None:
  272. stderr = await process.stderr.read()
  273. stderr_text = stderr.decode(errors="replace")
  274. logger.error("ffmpeg failed immediately (attempt %d): %s", reconnect_count + 1, stderr_text)
  275. _spawned_ffmpeg_pids.pop(process.pid, None)
  276. if not got_any_frames and reconnect_count == 0:
  277. # First attempt failed immediately — camera is likely unreachable
  278. yield (
  279. b"--frame\r\n"
  280. b"Content-Type: text/plain\r\n\r\n"
  281. b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
  282. )
  283. return
  284. reconnect_count += 1
  285. continue
  286. # Read JPEG frames from ffmpeg stdout
  287. buffer = b""
  288. stream_ended = False
  289. client_gone = False
  290. while True:
  291. if disconnect_event and disconnect_event.is_set():
  292. client_gone = True
  293. break
  294. try:
  295. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  296. if not chunk:
  297. # ffmpeg exited — log stderr and break to reconnect
  298. stderr_text = await _read_ffmpeg_stderr(process)
  299. if stderr_text:
  300. logger.warning("ffmpeg stderr (stream_id=%s): %s", stream_id, stderr_text)
  301. logger.warning("RTSP stream ended for %s (stream_id=%s), will reconnect", ip_address, stream_id)
  302. stream_ended = True
  303. break
  304. buffer += chunk
  305. # Extract complete JPEG frames from buffer
  306. while True:
  307. start_idx = buffer.find(jpeg_start)
  308. if start_idx == -1:
  309. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  310. break
  311. if start_idx > 0:
  312. buffer = buffer[start_idx:]
  313. end_idx = buffer.find(jpeg_end, 2)
  314. if end_idx == -1:
  315. break
  316. frame = buffer[: end_idx + 2]
  317. buffer = buffer[end_idx + 2 :]
  318. got_any_frames = True
  319. if printer_id is not None:
  320. import time
  321. _last_frames[printer_id] = frame
  322. _last_frame_times[printer_id] = time.time()
  323. yield (
  324. b"--frame\r\n"
  325. b"Content-Type: image/jpeg\r\n"
  326. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  327. b"\r\n" + frame + b"\r\n"
  328. )
  329. except TimeoutError:
  330. stderr_text = await _read_ffmpeg_stderr(process)
  331. if stderr_text:
  332. logger.warning("ffmpeg stderr on timeout: %s", stderr_text)
  333. logger.warning("RTSP read timeout for %s (stream_id=%s)", ip_address, stream_id)
  334. stream_ended = True
  335. break
  336. except asyncio.CancelledError:
  337. logger.info("Camera stream cancelled (stream_id=%s)", stream_id)
  338. client_gone = True
  339. break
  340. except GeneratorExit:
  341. logger.info("Camera stream generator exit (stream_id=%s)", stream_id)
  342. client_gone = True
  343. break
  344. # Clean up this ffmpeg process before reconnecting or exiting
  345. await _terminate_ffmpeg(process, stream_id)
  346. process = None
  347. if client_gone:
  348. break
  349. if stream_ended:
  350. reconnect_count += 1
  351. continue
  352. # Normal exit (shouldn't reach here, but be safe)
  353. break
  354. if reconnect_count > _RTSP_MAX_RECONNECTS:
  355. logger.error(
  356. "RTSP max reconnects (%d) reached for %s (stream_id=%s)",
  357. _RTSP_MAX_RECONNECTS,
  358. ip_address,
  359. stream_id,
  360. )
  361. except FileNotFoundError:
  362. logger.error("ffmpeg not found - camera streaming requires ffmpeg")
  363. yield (b"--frame\r\nContent-Type: text/plain\r\n\r\nError: ffmpeg not installed\r\n")
  364. except asyncio.CancelledError:
  365. logger.info("Camera stream task cancelled (stream_id=%s)", stream_id)
  366. except GeneratorExit:
  367. logger.info("Camera stream generator closed (stream_id=%s)", stream_id)
  368. except Exception as e:
  369. logger.exception("Camera stream error: %s", e)
  370. finally:
  371. # Remove from active streams
  372. if stream_id and stream_id in _active_streams:
  373. del _active_streams[stream_id]
  374. # Clean up frame buffer and timestamps
  375. if printer_id is not None:
  376. _last_frames.pop(printer_id, None)
  377. _last_frame_times.pop(printer_id, None)
  378. _stream_start_times.pop(printer_id, None)
  379. if process:
  380. await _terminate_ffmpeg(process, stream_id)
  381. logger.info("Camera stream stopped for %s (stream_id=%s)", ip_address, stream_id)
  382. @router.get("/{printer_id}/camera/stream")
  383. async def camera_stream(
  384. printer_id: int,
  385. request: Request,
  386. fps: int = 10,
  387. db: AsyncSession = Depends(get_db),
  388. ):
  389. """Stream live video from printer camera as MJPEG.
  390. This endpoint returns a multipart MJPEG stream that can be used directly
  391. in an <img> tag or video player.
  392. Note: Unauthenticated - loaded via <img> tags which can't send auth headers.
  393. Uses external camera if configured, otherwise uses built-in camera:
  394. - External: MJPEG, RTSP, or HTTP snapshot
  395. - A1/P1: Chamber image protocol (port 6000)
  396. - X1/H2/P2: RTSP via ffmpeg (port 322)
  397. Args:
  398. printer_id: Printer ID
  399. fps: Target frames per second (default: 10, max: 30)
  400. """
  401. import uuid
  402. printer = await get_printer_or_404(printer_id, db)
  403. # Check for external camera first
  404. if printer.external_camera_enabled and printer.external_camera_url:
  405. import time
  406. from backend.app.services.external_camera import generate_mjpeg_stream
  407. # Limit external camera FPS to reduce browser load
  408. fps = min(max(fps, 1), 15)
  409. logger.info(
  410. "Using external camera (%s) for printer %s at %s fps", printer.external_camera_type, printer_id, fps
  411. )
  412. # Track stream start
  413. _stream_start_times[printer_id] = time.time()
  414. _active_external_streams.add(printer_id)
  415. async def external_stream_wrapper():
  416. """Wrap external stream to track start/stop and update frame times."""
  417. try:
  418. async for frame in generate_mjpeg_stream(
  419. printer.external_camera_url, printer.external_camera_type, fps
  420. ):
  421. # generate_mjpeg_stream already rate-limits; just track frame times
  422. _last_frame_times[printer_id] = time.time()
  423. yield frame
  424. finally:
  425. _active_external_streams.discard(printer_id)
  426. logger.info("External camera stream ended for printer %s", printer_id)
  427. return StreamingResponse(
  428. external_stream_wrapper(),
  429. media_type="multipart/x-mixed-replace; boundary=frame",
  430. headers={
  431. "Cache-Control": "no-cache, no-store, must-revalidate",
  432. "Pragma": "no-cache",
  433. "Expires": "0",
  434. },
  435. )
  436. # Validate FPS - A1/P1 models max out at ~5 FPS
  437. if is_chamber_image_model(printer.model):
  438. fps = min(max(fps, 1), 5)
  439. else:
  440. fps = min(max(fps, 1), 30)
  441. # Generate unique stream ID for tracking
  442. stream_id = f"{printer_id}-{uuid.uuid4().hex[:8]}"
  443. # Create disconnect event that will be set when client disconnects
  444. disconnect_event = asyncio.Event()
  445. # Choose the appropriate stream generator based on model
  446. if is_chamber_image_model(printer.model):
  447. stream_generator = generate_chamber_mjpeg_stream
  448. logger.info("Using chamber image protocol for %s", printer.model)
  449. else:
  450. stream_generator = generate_rtsp_mjpeg_stream
  451. logger.info("Using RTSP protocol for %s", printer.model)
  452. # Track stream start time
  453. import time
  454. _stream_start_times[printer_id] = time.time()
  455. async def _kill_stream_process(sid: str):
  456. """Terminate+kill the ffmpeg process for a stream ID."""
  457. proc = _active_streams.get(sid)
  458. if proc and proc.returncode is None:
  459. try:
  460. proc.terminate()
  461. try:
  462. await asyncio.wait_for(proc.wait(), timeout=2.0)
  463. except TimeoutError:
  464. proc.kill()
  465. await proc.wait()
  466. except (ProcessLookupError, OSError):
  467. pass
  468. async def _monitor_disconnect():
  469. """Background task: poll for client disconnect independently of frame loop."""
  470. try:
  471. while not disconnect_event.is_set():
  472. await asyncio.sleep(2)
  473. if await request.is_disconnected():
  474. logger.info("Disconnect monitor: client gone (stream %s)", stream_id)
  475. disconnect_event.set()
  476. # Kill ffmpeg process (RTSP streams)
  477. await _kill_stream_process(stream_id)
  478. # Close chamber stream connection if applicable
  479. chamber = _active_chamber_streams.get(stream_id)
  480. if chamber:
  481. try:
  482. chamber[1].close()
  483. except OSError:
  484. pass
  485. break
  486. except asyncio.CancelledError:
  487. pass
  488. monitor_task = asyncio.create_task(_monitor_disconnect())
  489. async def stream_with_disconnect_check():
  490. """Wrapper generator that monitors for client disconnect."""
  491. try:
  492. async for chunk in stream_generator(
  493. ip_address=printer.ip_address,
  494. access_code=printer.access_code,
  495. model=printer.model,
  496. fps=fps,
  497. stream_id=stream_id,
  498. disconnect_event=disconnect_event,
  499. printer_id=printer_id,
  500. ):
  501. # Check if client is still connected
  502. if disconnect_event.is_set() or await request.is_disconnected():
  503. logger.info("Client disconnected detected for stream %s", stream_id)
  504. disconnect_event.set()
  505. break
  506. yield chunk
  507. except asyncio.CancelledError:
  508. logger.info("Stream %s cancelled", stream_id)
  509. disconnect_event.set()
  510. except GeneratorExit:
  511. logger.info("Stream %s generator closed", stream_id)
  512. disconnect_event.set()
  513. finally:
  514. disconnect_event.set()
  515. monitor_task.cancel()
  516. # Give a moment for the inner generator to clean up
  517. await asyncio.sleep(0.1)
  518. return StreamingResponse(
  519. stream_with_disconnect_check(),
  520. media_type="multipart/x-mixed-replace; boundary=frame",
  521. headers={
  522. "Cache-Control": "no-cache, no-store, must-revalidate",
  523. "Pragma": "no-cache",
  524. "Expires": "0",
  525. },
  526. )
  527. @router.api_route("/{printer_id}/camera/stop", methods=["GET", "POST"])
  528. async def stop_camera_stream(
  529. printer_id: int,
  530. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  531. ):
  532. """Stop all active camera streams for a printer.
  533. This can be called by the frontend when the camera window is closed.
  534. Accepts both GET and POST (POST for sendBeacon compatibility).
  535. """
  536. stopped = 0
  537. # Stop ffmpeg/RTSP streams
  538. to_remove = []
  539. for stream_id, process in list(_active_streams.items()):
  540. if stream_id.startswith(f"{printer_id}-"):
  541. to_remove.append(stream_id)
  542. if process.returncode is None:
  543. try:
  544. process.terminate()
  545. try:
  546. await asyncio.wait_for(process.wait(), timeout=2.0)
  547. except TimeoutError:
  548. logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
  549. process.kill()
  550. await process.wait()
  551. stopped += 1
  552. logger.info("Terminated ffmpeg process for stream %s", stream_id)
  553. except ProcessLookupError:
  554. pass # Process already dead
  555. except OSError as e:
  556. logger.warning("Error stopping stream %s: %s", stream_id, e)
  557. _spawned_ffmpeg_pids.pop(process.pid, None)
  558. for stream_id in to_remove:
  559. _active_streams.pop(stream_id, None)
  560. # Stop chamber image streams
  561. to_remove_chamber = []
  562. for stream_id, (_reader, writer) in list(_active_chamber_streams.items()):
  563. if stream_id.startswith(f"{printer_id}-"):
  564. to_remove_chamber.append(stream_id)
  565. try:
  566. writer.close()
  567. stopped += 1
  568. logger.info("Closed chamber image connection for stream %s", stream_id)
  569. except OSError as e:
  570. logger.warning("Error stopping chamber stream %s: %s", stream_id, e)
  571. for stream_id in to_remove_chamber:
  572. _active_chamber_streams.pop(stream_id, None)
  573. logger.info("Stopped %s camera stream(s) for printer %s", stopped, printer_id)
  574. return {"stopped": stopped}
  575. @router.get("/{printer_id}/camera/snapshot")
  576. async def camera_snapshot(
  577. printer_id: int,
  578. db: AsyncSession = Depends(get_db),
  579. ):
  580. """Capture a single frame from the printer camera.
  581. Returns a JPEG image.
  582. Note: Unauthenticated - loaded via <img> tags which can't send auth headers.
  583. """
  584. import tempfile
  585. from pathlib import Path
  586. printer = await get_printer_or_404(printer_id, db)
  587. # Check for external camera first
  588. if printer.external_camera_enabled and printer.external_camera_url:
  589. from backend.app.services.external_camera import capture_frame
  590. frame_data = await capture_frame(printer.external_camera_url, printer.external_camera_type, timeout=15)
  591. if not frame_data:
  592. raise HTTPException(
  593. status_code=503,
  594. detail="Failed to capture frame from external camera.",
  595. )
  596. return Response(
  597. content=frame_data,
  598. media_type="image/jpeg",
  599. headers={
  600. "Cache-Control": "no-cache, no-store, must-revalidate",
  601. "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
  602. },
  603. )
  604. # Create temporary file for the snapshot
  605. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
  606. temp_path = Path(f.name)
  607. try:
  608. success = await capture_camera_frame(
  609. ip_address=printer.ip_address,
  610. access_code=printer.access_code,
  611. model=printer.model,
  612. output_path=temp_path,
  613. timeout=15,
  614. )
  615. if not success:
  616. raise HTTPException(
  617. status_code=503,
  618. detail="Failed to capture camera frame. Ensure printer is on and camera is enabled.",
  619. )
  620. # Read and return the image
  621. with open(temp_path, "rb") as f:
  622. image_data = f.read()
  623. return Response(
  624. content=image_data,
  625. media_type="image/jpeg",
  626. headers={
  627. "Cache-Control": "no-cache, no-store, must-revalidate",
  628. "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
  629. },
  630. )
  631. finally:
  632. # Clean up temp file
  633. if temp_path.exists():
  634. temp_path.unlink()
  635. @router.get("/{printer_id}/camera/test")
  636. async def test_camera(
  637. printer_id: int,
  638. db: AsyncSession = Depends(get_db),
  639. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  640. ):
  641. """Test camera connection for a printer.
  642. Returns success status and any error message.
  643. """
  644. printer = await get_printer_or_404(printer_id, db)
  645. result = await test_camera_connection(
  646. ip_address=printer.ip_address,
  647. access_code=printer.access_code,
  648. model=printer.model,
  649. )
  650. return result
  651. @router.get("/{printer_id}/camera/status")
  652. async def camera_status(
  653. printer_id: int,
  654. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  655. ):
  656. """Get the status of an active camera stream.
  657. Returns whether a stream is active and when the last frame was received.
  658. Used by the frontend to detect stalled streams and auto-reconnect.
  659. """
  660. import time
  661. # Check if there's an active stream for this printer
  662. has_active_stream = False
  663. # Check external camera streams
  664. if printer_id in _active_external_streams:
  665. has_active_stream = True
  666. # Check ffmpeg/RTSP streams
  667. if not has_active_stream:
  668. for stream_id in _active_streams:
  669. if stream_id.startswith(f"{printer_id}-"):
  670. process = _active_streams[stream_id]
  671. if process.returncode is None:
  672. has_active_stream = True
  673. break
  674. # Check chamber image streams
  675. if not has_active_stream:
  676. for stream_id in _active_chamber_streams:
  677. if stream_id.startswith(f"{printer_id}-"):
  678. has_active_stream = True
  679. break
  680. # Get timing information
  681. current_time = time.time()
  682. last_frame_time = _last_frame_times.get(printer_id)
  683. stream_start_time = _stream_start_times.get(printer_id)
  684. # Calculate seconds since last frame
  685. seconds_since_frame = None
  686. if last_frame_time is not None:
  687. seconds_since_frame = current_time - last_frame_time
  688. # Calculate stream uptime
  689. stream_uptime = None
  690. if stream_start_time is not None:
  691. stream_uptime = current_time - stream_start_time
  692. return {
  693. "active": has_active_stream,
  694. "has_frames": printer_id in _last_frames,
  695. "seconds_since_frame": seconds_since_frame,
  696. "stream_uptime": stream_uptime,
  697. # Consider stalled if no frame for more than 10 seconds after stream started
  698. "stalled": (
  699. has_active_stream
  700. and stream_uptime is not None
  701. and stream_uptime > 5 # Give 5 seconds for stream to start
  702. and (seconds_since_frame is None or seconds_since_frame > 10)
  703. ),
  704. }
  705. @router.post("/{printer_id}/camera/external/test")
  706. async def test_external_camera(
  707. printer_id: int,
  708. url: str,
  709. camera_type: str,
  710. db: AsyncSession = Depends(get_db),
  711. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  712. ):
  713. """Test external camera connection.
  714. Args:
  715. printer_id: Printer ID (for authorization)
  716. url: Camera URL or USB device path to test
  717. camera_type: Camera type ("mjpeg", "rtsp", "snapshot", "usb")
  718. Returns:
  719. Dict with {success: bool, error?: str, resolution?: str}
  720. """
  721. # Verify printer exists (for authorization)
  722. await get_printer_or_404(printer_id, db)
  723. from backend.app.services.external_camera import test_connection
  724. return await test_connection(url, camera_type)
  725. @router.get("/{printer_id}/camera/check-plate")
  726. async def check_plate_empty(
  727. printer_id: int,
  728. plate_type: str | None = None,
  729. use_external: bool = False,
  730. include_debug_image: bool = False,
  731. db: AsyncSession = Depends(get_db),
  732. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  733. ):
  734. """Check if the build plate is empty using camera vision.
  735. Uses calibration-based difference detection - compares current frame
  736. to a reference image of the empty plate.
  737. IMPORTANT: Chamber light must be ON for reliable detection.
  738. Args:
  739. printer_id: Printer ID
  740. plate_type: Type of build plate (e.g., "High Temp Plate") for calibration lookup
  741. use_external: If True, prefer external camera over built-in
  742. include_debug_image: If True, return URL to annotated debug image
  743. Returns:
  744. Dict with detection results:
  745. - is_empty: bool - Whether plate appears empty
  746. - confidence: float - Confidence level (0.0 to 1.0)
  747. - difference_percent: float - How different from calibration reference
  748. - message: str - Human-readable result message
  749. - needs_calibration: bool - True if calibration is required
  750. - light_warning: bool - True if chamber light is off
  751. """
  752. from backend.app.services.plate_detection import (
  753. check_plate_empty as do_check,
  754. is_plate_detection_available,
  755. )
  756. from backend.app.services.printer_manager import printer_manager
  757. # Check printer exists first (before OpenCV check)
  758. printer = await get_printer_or_404(printer_id, db)
  759. if not is_plate_detection_available():
  760. raise HTTPException(
  761. status_code=503,
  762. detail="Plate detection not available. Install opencv-python-headless to enable.",
  763. )
  764. # Check chamber light status
  765. light_warning = False
  766. state = printer_manager.get_status(printer_id)
  767. if state and not state.chamber_light:
  768. light_warning = True
  769. from backend.app.services.plate_detection import PlateDetector
  770. # Build ROI tuple from printer settings if available
  771. roi = None
  772. if all(
  773. [
  774. printer.plate_detection_roi_x is not None,
  775. printer.plate_detection_roi_y is not None,
  776. printer.plate_detection_roi_w is not None,
  777. printer.plate_detection_roi_h is not None,
  778. ]
  779. ):
  780. roi = (
  781. printer.plate_detection_roi_x,
  782. printer.plate_detection_roi_y,
  783. printer.plate_detection_roi_w,
  784. printer.plate_detection_roi_h,
  785. )
  786. result = await do_check(
  787. printer_id=printer.id,
  788. ip_address=printer.ip_address,
  789. access_code=printer.access_code,
  790. model=printer.model,
  791. plate_type=plate_type,
  792. include_debug_image=include_debug_image,
  793. external_camera_url=printer.external_camera_url if printer.external_camera_enabled else None,
  794. external_camera_type=printer.external_camera_type if printer.external_camera_enabled else None,
  795. use_external=use_external,
  796. roi=roi,
  797. )
  798. # Get reference count for the response
  799. detector = PlateDetector()
  800. ref_count = detector.get_calibration_count(printer.id)
  801. response = result.to_dict()
  802. response["light_warning"] = light_warning
  803. response["reference_count"] = ref_count
  804. response["max_references"] = detector.MAX_REFERENCES
  805. # Include current ROI in response
  806. if roi:
  807. response["roi"] = {"x": roi[0], "y": roi[1], "w": roi[2], "h": roi[3]}
  808. else:
  809. # Return default ROI
  810. response["roi"] = {"x": 0.15, "y": 0.35, "w": 0.70, "h": 0.55}
  811. # If debug image requested and available, encode as base64 data URL
  812. if include_debug_image and result.debug_image:
  813. import base64
  814. b64_image = base64.b64encode(result.debug_image).decode("utf-8")
  815. response["debug_image_url"] = f"data:image/jpeg;base64,{b64_image}"
  816. return response
  817. @router.post("/{printer_id}/camera/plate-detection/calibrate")
  818. async def calibrate_plate_detection(
  819. printer_id: int,
  820. label: str | None = None,
  821. use_external: bool = False,
  822. db: AsyncSession = Depends(get_db),
  823. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  824. ):
  825. """Calibrate plate detection by capturing a reference image of the empty plate.
  826. The plate MUST be empty when calling this endpoint. The captured image
  827. will be used as the reference for future detection comparisons.
  828. Supports up to 5 reference images per printer. When adding a 6th, the oldest
  829. is automatically removed.
  830. IMPORTANT: Chamber light should be ON for calibration.
  831. Args:
  832. printer_id: Printer ID
  833. label: Optional label for this reference (e.g., "High Temp Plate", "Wham Bam")
  834. use_external: If True, prefer external camera over built-in
  835. Returns:
  836. Dict with:
  837. - success: bool - Whether calibration succeeded
  838. - message: str - Status message
  839. - index: int - The reference slot used (0-4)
  840. """
  841. from backend.app.services.plate_detection import (
  842. calibrate_plate,
  843. is_plate_detection_available,
  844. )
  845. from backend.app.services.printer_manager import printer_manager
  846. # Check printer exists first (before OpenCV check)
  847. printer = await get_printer_or_404(printer_id, db)
  848. if not is_plate_detection_available():
  849. raise HTTPException(
  850. status_code=503,
  851. detail="Plate detection not available. Install opencv-python-headless to enable.",
  852. )
  853. # Check chamber light - warn but don't block
  854. state = printer_manager.get_status(printer_id)
  855. light_warning = state and not state.chamber_light
  856. success, message, index = await calibrate_plate(
  857. printer_id=printer.id,
  858. ip_address=printer.ip_address,
  859. access_code=printer.access_code,
  860. model=printer.model,
  861. label=label,
  862. external_camera_url=printer.external_camera_url if printer.external_camera_enabled else None,
  863. external_camera_type=printer.external_camera_type if printer.external_camera_enabled else None,
  864. use_external=use_external,
  865. )
  866. if light_warning and success:
  867. message += " (Warning: Chamber light was off)"
  868. return {"success": success, "message": message, "index": index}
  869. @router.delete("/{printer_id}/camera/plate-detection/calibrate")
  870. async def delete_plate_calibration(
  871. printer_id: int,
  872. plate_type: str | None = None,
  873. db: AsyncSession = Depends(get_db),
  874. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  875. ):
  876. """Delete the plate detection calibration for a printer and plate type.
  877. Args:
  878. printer_id: Printer ID
  879. plate_type: Type of build plate (if None, deletes legacy non-plate-specific calibration)
  880. Returns:
  881. Dict with:
  882. - success: bool - Whether deletion succeeded
  883. - message: str - Status message
  884. """
  885. from backend.app.services.plate_detection import (
  886. delete_calibration,
  887. is_plate_detection_available,
  888. )
  889. # Verify printer exists first (before OpenCV check)
  890. await get_printer_or_404(printer_id, db)
  891. if not is_plate_detection_available():
  892. raise HTTPException(
  893. status_code=503,
  894. detail="Plate detection not available. Install opencv-python-headless to enable.",
  895. )
  896. deleted = delete_calibration(printer_id, plate_type)
  897. plate_msg = f" for '{plate_type}'" if plate_type else ""
  898. return {
  899. "success": deleted,
  900. "message": f"Calibration deleted{plate_msg}" if deleted else f"No calibration found{plate_msg}",
  901. }
  902. @router.get("/{printer_id}/camera/plate-detection/status")
  903. async def get_plate_detection_status(
  904. printer_id: int,
  905. plate_type: str | None = None,
  906. db: AsyncSession = Depends(get_db),
  907. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  908. ):
  909. """Check plate detection status for a printer and plate type.
  910. Returns:
  911. Dict with:
  912. - available: bool - Whether OpenCV is installed
  913. - calibrated: bool - Whether printer has calibration for this plate type
  914. - plate_type: str - The plate type queried
  915. - chamber_light: bool - Whether chamber light is on
  916. - message: str - Status message
  917. """
  918. from backend.app.services.plate_detection import (
  919. get_calibration_status,
  920. is_plate_detection_available,
  921. )
  922. from backend.app.services.printer_manager import printer_manager
  923. # Verify printer exists first (before OpenCV check)
  924. await get_printer_or_404(printer_id, db)
  925. if not is_plate_detection_available():
  926. return {
  927. "available": False,
  928. "calibrated": False,
  929. "plate_type": plate_type,
  930. "chamber_light": False,
  931. "message": "OpenCV not installed",
  932. }
  933. # Get chamber light status
  934. state = printer_manager.get_status(printer_id)
  935. chamber_light = state.chamber_light if state else False
  936. status = get_calibration_status(printer_id, plate_type)
  937. status["chamber_light"] = chamber_light
  938. return status
  939. @router.get("/{printer_id}/camera/plate-detection/references")
  940. async def get_plate_references(
  941. printer_id: int,
  942. db: AsyncSession = Depends(get_db),
  943. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  944. ):
  945. """Get all calibration references for a printer with metadata.
  946. Returns list of references with index, label, timestamp, and thumbnail URL.
  947. """
  948. from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
  949. # Verify printer exists first (before OpenCV check)
  950. await get_printer_or_404(printer_id, db)
  951. if not is_plate_detection_available():
  952. raise HTTPException(503, "Plate detection not available")
  953. detector = PlateDetector()
  954. references = detector.get_references(printer_id)
  955. # Add thumbnail URLs
  956. for ref in references:
  957. ref["thumbnail_url"] = (
  958. f"/api/v1/printers/{printer_id}/camera/plate-detection/references/{ref['index']}/thumbnail"
  959. )
  960. return {
  961. "references": references,
  962. "max_references": detector.MAX_REFERENCES,
  963. }
  964. @router.get("/{printer_id}/camera/plate-detection/references/{index}/thumbnail")
  965. async def get_reference_thumbnail(
  966. printer_id: int,
  967. index: int,
  968. db: AsyncSession = Depends(get_db),
  969. ):
  970. """Get thumbnail image for a calibration reference.
  971. Note: Unauthenticated - loaded via <img> tags which can't send auth headers.
  972. """
  973. from fastapi.responses import Response
  974. from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
  975. # Verify printer exists first (before OpenCV check)
  976. await get_printer_or_404(printer_id, db)
  977. if not is_plate_detection_available():
  978. raise HTTPException(503, "Plate detection not available")
  979. detector = PlateDetector()
  980. thumbnail = detector.get_reference_thumbnail(printer_id, index)
  981. if thumbnail is None:
  982. raise HTTPException(404, "Reference not found")
  983. return Response(content=thumbnail, media_type="image/jpeg")
  984. @router.put("/{printer_id}/camera/plate-detection/references/{index}")
  985. async def update_reference_label(
  986. printer_id: int,
  987. index: int,
  988. label: str,
  989. db: AsyncSession = Depends(get_db),
  990. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  991. ):
  992. """Update the label for a calibration reference."""
  993. from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
  994. # Verify printer exists first (before OpenCV check)
  995. await get_printer_or_404(printer_id, db)
  996. if not is_plate_detection_available():
  997. raise HTTPException(503, "Plate detection not available")
  998. detector = PlateDetector()
  999. success = detector.update_reference_label(printer_id, index, label)
  1000. if not success:
  1001. raise HTTPException(404, "Reference not found")
  1002. return {"success": True, "index": index, "label": label}
  1003. @router.delete("/{printer_id}/camera/plate-detection/references/{index}")
  1004. async def delete_reference(
  1005. printer_id: int,
  1006. index: int,
  1007. db: AsyncSession = Depends(get_db),
  1008. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  1009. ):
  1010. """Delete a specific calibration reference."""
  1011. from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
  1012. # Verify printer exists first (before OpenCV check)
  1013. await get_printer_or_404(printer_id, db)
  1014. if not is_plate_detection_available():
  1015. raise HTTPException(503, "Plate detection not available")
  1016. detector = PlateDetector()
  1017. success = detector.delete_reference(printer_id, index)
  1018. if not success:
  1019. raise HTTPException(404, "Reference not found")
  1020. return {"success": True, "message": "Reference deleted"}
  1021. def _scan_bambu_ffmpeg_pids() -> list[int]:
  1022. """Scan /proc for ffmpeg processes with Bambu RTSP URLs.
  1023. These are definitely ours — no other software connects to rtsps://bblp:.
  1024. This catches orphans that survive app restarts and are not in any tracking dict.
  1025. """
  1026. import os
  1027. pids = []
  1028. try:
  1029. for entry in os.listdir("/proc"):
  1030. if not entry.isdigit():
  1031. continue
  1032. try:
  1033. with open(f"/proc/{entry}/cmdline", "rb") as f:
  1034. cmdline = f.read()
  1035. if b"ffmpeg" in cmdline and b"rtsps://bblp:" in cmdline:
  1036. pids.append(int(entry))
  1037. except (OSError, PermissionError, ValueError):
  1038. continue
  1039. except OSError:
  1040. pass
  1041. return pids
  1042. async def cleanup_orphaned_streams():
  1043. """Clean up orphaned ffmpeg processes and stale stream entries.
  1044. Called periodically from the background task loop in main.py.
  1045. Three-layer cleanup:
  1046. 1. /proc scan — finds ALL Bambu ffmpeg processes on the system, even those
  1047. from previous app sessions. This is the nuclear safety net.
  1048. 2. _spawned_ffmpeg_pids — tracks PIDs spawned this session, catches orphans
  1049. that were removed from _active_streams but not killed.
  1050. 3. _active_streams — kills stale entries with no recent frames.
  1051. """
  1052. import os
  1053. import signal
  1054. import time
  1055. cleaned = 0
  1056. now = time.time()
  1057. # Collect PIDs that are legitimately in-use (active streams + any tracked spawn)
  1058. active_pids = {proc.pid for proc in _active_streams.values() if proc.returncode is None}
  1059. active_pids.update(_spawned_ffmpeg_pids.keys())
  1060. # 1. /proc scan — catch ALL orphaned Bambu ffmpeg processes on the system.
  1061. # Any ffmpeg with rtsps://bblp: that is NOT tracked by us is orphaned.
  1062. for pid in _scan_bambu_ffmpeg_pids():
  1063. if pid in active_pids:
  1064. continue
  1065. logger.info("Killing orphaned ffmpeg process found via /proc (pid=%d)", pid)
  1066. try:
  1067. os.kill(pid, signal.SIGKILL)
  1068. except (ProcessLookupError, OSError):
  1069. pass
  1070. _spawned_ffmpeg_pids.pop(pid, None)
  1071. cleaned += 1
  1072. # 2. Clean up _spawned_ffmpeg_pids entries for dead processes
  1073. for pid in list(_spawned_ffmpeg_pids):
  1074. try:
  1075. os.kill(pid, 0) # existence check
  1076. except (ProcessLookupError, OSError):
  1077. _spawned_ffmpeg_pids.pop(pid, None)
  1078. # 3. Clean up _active_streams entries with dead processes
  1079. dead_streams = [sid for sid, proc in _active_streams.items() if proc.returncode is not None]
  1080. for sid in dead_streams:
  1081. proc = _active_streams.pop(sid, None)
  1082. if proc:
  1083. _spawned_ffmpeg_pids.pop(proc.pid, None)
  1084. cleaned += 1
  1085. # 4. Kill stale active streams (alive but no frames for >60s)
  1086. for sid, proc in list(_active_streams.items()):
  1087. if proc.returncode is not None:
  1088. continue
  1089. try:
  1090. printer_id = int(sid.split("-", 1)[0])
  1091. except (ValueError, IndexError):
  1092. continue
  1093. start_time = _stream_start_times.get(printer_id, now)
  1094. last_frame = _last_frame_times.get(printer_id, start_time)
  1095. if now - start_time > 120 and now - last_frame > 60:
  1096. logger.info("Killing stale ffmpeg stream %s (no frames for %.0fs)", sid, now - last_frame)
  1097. try:
  1098. proc.kill()
  1099. await proc.wait()
  1100. except (ProcessLookupError, OSError):
  1101. pass
  1102. _active_streams.pop(sid, None)
  1103. _spawned_ffmpeg_pids.pop(proc.pid, None)
  1104. cleaned += 1
  1105. # 4. Clean stale chamber stream entries
  1106. dead_chamber = [sid for sid, (_reader, writer) in _active_chamber_streams.items() if writer.is_closing()]
  1107. for sid in dead_chamber:
  1108. _active_chamber_streams.pop(sid, None)
  1109. cleaned += 1
  1110. if cleaned:
  1111. logger.info("Cleaned up %d orphaned camera stream(s)", cleaned)