| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599 |
- """Camera streaming API endpoints for Bambu Lab printers."""
- import asyncio
- import logging
- import os
- import subprocess
- import sys
- from collections.abc import AsyncGenerator
- from fastapi import APIRouter, Depends, HTTPException, Request
- from fastapi.responses import Response, StreamingResponse
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.core.auth import (
- RequireCameraStreamTokenIfAuthEnabled,
- RequirePermissionIfAuthEnabled,
- create_camera_stream_token,
- )
- from backend.app.core.database import get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.printer import Printer
- from backend.app.models.user import User
- from backend.app.services.camera import (
- capture_camera_frame,
- create_tls_proxy,
- generate_chamber_image_stream,
- get_camera_port,
- get_ffmpeg_path,
- is_chamber_image_model,
- read_next_chamber_frame,
- rtsp_socket_timeout_flag,
- test_camera_connection,
- )
- from backend.app.services.camera_fanout import (
- MjpegBroadcaster,
- get_or_create_broadcaster,
- iter_subscriber,
- shutdown_broadcaster,
- )
- from backend.app.services.camera_profiles import get_camera_profile
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/printers", tags=["camera"])
- # Track active ffmpeg processes for cleanup
- _active_streams: dict[str, asyncio.subprocess.Process] = {}
- # Track active chamber image connections for cleanup
- _active_chamber_streams: dict[str, tuple] = {}
- # Store last frame for each printer (for photo capture from active stream)
- _last_frames: dict[int, bytes] = {}
- # Track last frame timestamp for each printer (for stall detection)
- _last_frame_times: dict[int, float] = {}
- # Track stream start times for each printer
- _stream_start_times: dict[int, float] = {}
- # Track active external camera streams by printer ID
- _active_external_streams: set[int] = set()
- # Track ALL spawned ffmpeg PIDs (persists even if _active_streams entries are removed)
- # Maps PID -> spawn timestamp — used by cleanup to find truly orphaned OS processes
- _spawned_ffmpeg_pids: dict[int, float] = {}
- # Track disconnect events per stream_id — allows stop endpoint and cleanup
- # to signal generators to stop reconnecting instead of just killing the process
- _disconnect_events: dict[str, asyncio.Event] = {}
- # Track last frame time per stream_id (not just per printer_id) for stale detection
- _stream_last_frame_times: dict[str, float] = {}
- def get_buffered_frame(printer_id: int) -> bytes | None:
- """Get the last buffered frame for a printer from an active stream.
- Returns the JPEG frame data if available, or None if no active stream.
- """
- return _last_frames.get(printer_id)
- def is_stream_active(printer_id: int) -> bool:
- """Return True iff a fan-out camera stream is currently registered for this printer.
- Snapshot callers (Obico polling, manual /camera/snapshot) MUST NOT open a
- second concurrent RTSP/chamber-image socket while a viewer is attached:
- most Bambu firmwares allow only one camera connection, so the competing
- socket either kicks the live viewer off or gets refused itself, and the
- resulting reconnect storm tears down the fan-out broadcaster (see #1348).
- Callers should consult this BEFORE trying to open a fresh socket and skip
- the capture cycle when it returns True — even if try_get_active_buffered_frame
- returns None (the stream may be running but the first frame hasn't landed
- in the buffer yet, or the upstream is mid-reconnect).
- """
- return any(k.startswith(f"{printer_id}-") for k in _active_streams) or any(
- k.startswith(f"{printer_id}-") for k in _active_chamber_streams
- )
- def try_get_active_buffered_frame(printer_id: int) -> bytes | None:
- """Return a buffered frame iff a stream is currently running for this printer.
- Snapshot callers (Obico polling, manual /camera/snapshot) tap the fan-out
- broadcaster's running upstream instead of opening a second concurrent
- RTSP/chamber-image socket. Critical for printers that allow only one
- camera connection (e.g. X2D firmware 01.01.00.00; see #1271).
- Returns None when no broadcaster is active for this printer, so callers
- fall through to their existing fresh-socket path unchanged.
- NB: returning None does NOT mean "safe to open a fresh socket" — it also
- fires when the stream is registered but no frame has been buffered yet
- (startup race, mid-reconnect). Callers that must avoid competing sockets
- should consult is_stream_active() first; see #1348.
- """
- if not is_stream_active(printer_id):
- return None
- return _last_frames.get(printer_id)
- async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
- """Get printer by ID or raise 404."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(status_code=404, detail="Printer not found")
- return printer
- async def generate_chamber_mjpeg_stream(
- ip_address: str,
- access_code: str,
- model: str | None,
- fps: int = 5,
- stream_id: str | None = None,
- disconnect_event: asyncio.Event | None = None,
- printer_id: int | None = None,
- ) -> AsyncGenerator[bytes, None]:
- """Generate MJPEG stream from A1/P1 printer using chamber image protocol.
- This connects to port 6000 and reads JPEG frames using the Bambu binary protocol.
- """
- logger.info("Starting chamber image stream for %s (stream_id=%s, model=%s)", ip_address, stream_id, model)
- # Register disconnect event so stop endpoint can signal us
- if stream_id and disconnect_event:
- _disconnect_events[stream_id] = disconnect_event
- connection = await generate_chamber_image_stream(ip_address, access_code, fps)
- if connection is None:
- logger.error("Failed to connect to chamber image stream for %s", ip_address)
- yield (
- b"--frame\r\n"
- b"Content-Type: text/plain\r\n\r\n"
- b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
- )
- return
- reader, writer = connection
- # Track active connection for cleanup
- if stream_id:
- _active_chamber_streams[stream_id] = (reader, writer)
- try:
- frame_interval = 1.0 / fps if fps > 0 else 0.2
- last_frame_time = 0.0
- while True:
- # Check if client disconnected
- if disconnect_event and disconnect_event.is_set():
- logger.info("Client disconnected, stopping chamber stream %s", stream_id)
- break
- # Read next frame
- frame = await read_next_chamber_frame(reader, timeout=30.0)
- if frame is None:
- logger.warning("Chamber image stream ended for %s", stream_id)
- break
- # Save frame to buffer for photo capture and track timestamp
- if printer_id is not None:
- import time
- _last_frames[printer_id] = frame
- _last_frame_times[printer_id] = time.time()
- # Rate limiting - skip frames if needed to maintain target FPS
- current_time = asyncio.get_event_loop().time()
- if current_time - last_frame_time < frame_interval:
- continue
- last_frame_time = current_time
- # Yield frame in MJPEG format
- yield (
- b"--frame\r\n"
- b"Content-Type: image/jpeg\r\n"
- b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
- b"\r\n" + frame + b"\r\n"
- )
- except asyncio.CancelledError:
- logger.info("Chamber image stream cancelled (stream_id=%s)", stream_id)
- except GeneratorExit:
- logger.info("Chamber image stream generator exit (stream_id=%s)", stream_id)
- except Exception as e:
- logger.exception("Chamber image stream error: %s", e)
- finally:
- # Remove from active streams and disconnect events
- if stream_id:
- _active_chamber_streams.pop(stream_id, None)
- _disconnect_events.pop(stream_id, None)
- _stream_last_frame_times.pop(stream_id, None)
- # Clean up frame buffer and timestamps
- if printer_id is not None:
- _last_frames.pop(printer_id, None)
- _last_frame_times.pop(printer_id, None)
- _stream_start_times.pop(printer_id, None)
- # Close the connection
- try:
- writer.close()
- await writer.wait_closed()
- except OSError:
- pass # Connection already closed or broken; cleanup is best-effort
- logger.info("Chamber image stream stopped for %s (stream_id=%s)", ip_address, stream_id)
- async def _terminate_ffmpeg(process: asyncio.subprocess.Process, stream_id: str | None = None) -> None:
- """Terminate an ffmpeg process gracefully, then kill if needed."""
- if process.returncode is not None:
- return # Already dead
- try:
- process.terminate()
- try:
- await asyncio.wait_for(process.wait(), timeout=2.0)
- except TimeoutError:
- logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
- process.kill()
- await process.wait()
- except ProcessLookupError:
- pass # Already dead
- except OSError as e:
- logger.warning("Error terminating ffmpeg: %s", e)
- _spawned_ffmpeg_pids.pop(process.pid, None)
- def _summarize_ffmpeg_stderr(text: str | None) -> str:
- """Strip ffmpeg's boilerplate banner and keep only actionable lines.
- ffmpeg prints ~20 lines of version/build/configuration/lib headers before
- any actual error message. Logging the full banner on every retry floods
- the log (hundreds of lines per failed stream). This filter drops the
- banner and caps output at the last 10 meaningful lines.
- """
- if not text:
- return ""
- banner_prefixes = (
- "ffmpeg version ",
- " built with ",
- " configuration:",
- " libavutil ",
- " libavcodec ",
- " libavformat ",
- " libavdevice ",
- " libavfilter ",
- " libswscale ",
- " libswresample ",
- " libpostproc ",
- )
- meaningful = [ln for ln in text.splitlines() if ln.strip() and not ln.startswith(banner_prefixes)]
- return "\n".join(meaningful[-10:])
- async def _read_ffmpeg_stderr(process: asyncio.subprocess.Process) -> str | None:
- """Read whatever ffmpeg has written to stderr so far (best-effort).
- ffmpeg's stderr must be drained *incrementally*. A stalled-but-still-alive
- ffmpeg — the typical P2S RTSP failure, where it connects but never produces
- a frame — never closes stderr, so a plain ``stderr.read()`` (read-to-EOF)
- blocks until the wait_for timeout and returns nothing, discarding the
- banner + stream-analysis lines ffmpeg already printed. Reading in bounded
- chunks returns the buffered output promptly whether or not ffmpeg has
- exited. Returns the content with ffmpeg's boilerplate banner stripped.
- """
- if not process or not process.stderr:
- return None
- chunks: list[bytes] = []
- total = 0
- cap = 65536
- try:
- while total < cap:
- chunk = await asyncio.wait_for(process.stderr.read(8192), timeout=2.0)
- if not chunk:
- break # EOF — ffmpeg has exited
- chunks.append(chunk)
- total += len(chunk)
- except Exception:
- # Timed out waiting for more data — ffmpeg is alive but quiet now.
- # Fall through and return whatever it already printed.
- pass
- if not chunks:
- return None
- return _summarize_ffmpeg_stderr(b"".join(chunks).decode(errors="replace")) or None
- async def generate_rtsp_mjpeg_stream(
- ip_address: str,
- access_code: str,
- model: str | None,
- fps: int = 10,
- stream_id: str | None = None,
- disconnect_event: asyncio.Event | None = None,
- printer_id: int | None = None,
- ) -> AsyncGenerator[bytes, None]:
- """Generate MJPEG stream from printer camera using ffmpeg/RTSP.
- This is for X1/H2/P2 models that support RTSP streaming.
- Auto-reconnects when the printer drops the RTSP session (common on P2S).
- Per-model knobs (probesize, analyzeduration, reconnect cadence) come from
- :func:`camera_profiles.get_camera_profile` so quirky firmwares can be
- handled by adding a profile entry rather than tuning a global constant.
- """
- ffmpeg = get_ffmpeg_path()
- if not ffmpeg:
- logger.error("ffmpeg not found - camera streaming requires ffmpeg")
- yield (b"--frame\r\nContent-Type: text/plain\r\n\r\nError: ffmpeg not installed\r\n")
- return
- profile = get_camera_profile(model)
- port = get_camera_port(model)
- # Use a local TLS proxy so Python's OpenSSL handles TLS instead of
- # ffmpeg's GnuTLS. This fixes P2S (and potentially other models)
- # dropping the RTSP session after a few seconds due to GnuTLS's
- # hardened Debian defaults rejecting TLS renegotiation.
- proxy_port, proxy_server = await create_tls_proxy(ip_address, port)
- camera_url = f"rtsp://bblp:{access_code}@127.0.0.1:{proxy_port}/streaming/live/1"
- # ffmpeg command to output MJPEG stream to stdout
- cmd = [
- ffmpeg,
- "-rtsp_transport",
- "tcp",
- "-rtsp_flags",
- "prefer_tcp",
- # Socket I/O timeout name varies by ffmpeg version (#1504); see
- # rtsp_socket_timeout_flag(). The 30s value is microseconds for
- # both names.
- f"-{rtsp_socket_timeout_flag()}",
- "30000000",
- "-buffer_size",
- "1024000", # 1MB buffer
- "-max_delay",
- "500000", # 0.5 seconds max delay
- "-probesize",
- str(profile.probesize),
- "-analyzeduration",
- str(profile.analyzeduration),
- "-fflags",
- "nobuffer", # Reduce internal buffering
- "-flags",
- "low_delay", # Minimize decode latency
- *profile.extra_ffmpeg_input_args,
- "-i",
- camera_url,
- "-f",
- "mjpeg",
- "-q:v",
- "5",
- "-r",
- str(fps),
- "-an", # No audio
- "-", # Output to stdout
- ]
- # Register disconnect event so stop endpoint can signal us
- if stream_id and disconnect_event:
- _disconnect_events[stream_id] = disconnect_event
- logger.info(
- "Starting RTSP camera stream for %s (stream_id=%s, model=%s, fps=%s, probesize=%s, analyzeduration=%s)",
- ip_address,
- stream_id,
- model,
- fps,
- profile.probesize,
- profile.analyzeduration,
- )
- # Log the full argv so a support bundle shows the actual ffmpeg flags
- # (probesize, analyzeduration, transport, ...). Only camera_url carries a
- # secret (the access code), so redact just that one element.
- _redacted_cmd = ["rtsp://<redacted>/streaming/live/1" if a == camera_url else a for a in cmd]
- logger.debug("ffmpeg command: %s", " ".join(_redacted_cmd))
- # On Windows, spawn ffmpeg in its own process group so that
- # terminate() doesn't broadcast CTRL_C_EVENT to uvicorn (#605).
- spawn_kwargs: dict = {}
- if sys.platform == "win32":
- spawn_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
- jpeg_start = b"\xff\xd8"
- jpeg_end = b"\xff\xd9"
- reconnect_count = 0
- process = None
- got_any_frames = False
- try:
- while reconnect_count <= profile.rtsp_reconnect_max:
- # Check for client disconnect before (re)connecting
- if disconnect_event and disconnect_event.is_set():
- break
- if reconnect_count > 0:
- logger.info(
- "RTSP reconnecting (%d/%d) for %s (stream_id=%s)",
- reconnect_count,
- profile.rtsp_reconnect_max,
- ip_address,
- stream_id,
- )
- await asyncio.sleep(profile.rtsp_reconnect_delay)
- if disconnect_event and disconnect_event.is_set():
- break
- # Spawn ffmpeg
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- **spawn_kwargs,
- )
- if stream_id:
- _active_streams[stream_id] = process
- import time as _time
- _spawned_ffmpeg_pids[process.pid] = _time.time()
- # Brief check for immediate startup failures
- await asyncio.sleep(0.1)
- if process.returncode is not None:
- stderr = await process.stderr.read()
- stderr_text = _summarize_ffmpeg_stderr(stderr.decode(errors="replace"))
- logger.error("ffmpeg failed immediately (attempt %d): %s", reconnect_count + 1, stderr_text)
- _spawned_ffmpeg_pids.pop(process.pid, None)
- if not got_any_frames and reconnect_count == 0:
- # First attempt failed immediately — camera is likely unreachable
- yield (
- b"--frame\r\n"
- b"Content-Type: text/plain\r\n\r\n"
- b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
- )
- return
- reconnect_count += 1
- continue
- # Read JPEG frames from ffmpeg stdout
- buffer = b""
- stream_ended = False
- client_gone = False
- while True:
- if disconnect_event and disconnect_event.is_set():
- client_gone = True
- break
- try:
- chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
- if not chunk:
- # ffmpeg exited — log stderr and break to reconnect
- stderr_text = await _read_ffmpeg_stderr(process)
- if stderr_text:
- logger.warning("ffmpeg stderr (stream_id=%s): %s", stream_id, stderr_text)
- logger.warning("RTSP stream ended for %s (stream_id=%s), will reconnect", ip_address, stream_id)
- stream_ended = True
- break
- buffer += chunk
- # Extract complete JPEG frames from buffer
- while True:
- start_idx = buffer.find(jpeg_start)
- if start_idx == -1:
- buffer = buffer[-2:] if len(buffer) > 2 else buffer
- break
- if start_idx > 0:
- buffer = buffer[start_idx:]
- end_idx = buffer.find(jpeg_end, 2)
- if end_idx == -1:
- break
- frame = buffer[: end_idx + 2]
- buffer = buffer[end_idx + 2 :]
- got_any_frames = True
- if printer_id is not None:
- import time
- _last_frames[printer_id] = frame
- _last_frame_times[printer_id] = time.time()
- if stream_id:
- _stream_last_frame_times[stream_id] = time.time()
- yield (
- b"--frame\r\n"
- b"Content-Type: image/jpeg\r\n"
- b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
- b"\r\n" + frame + b"\r\n"
- )
- except TimeoutError:
- stderr_text = await _read_ffmpeg_stderr(process)
- if stderr_text:
- logger.warning("ffmpeg stderr on timeout: %s", stderr_text)
- logger.warning("RTSP read timeout for %s (stream_id=%s)", ip_address, stream_id)
- stream_ended = True
- break
- except asyncio.CancelledError:
- logger.info("Camera stream cancelled (stream_id=%s)", stream_id)
- client_gone = True
- break
- except GeneratorExit:
- logger.info("Camera stream generator exit (stream_id=%s)", stream_id)
- client_gone = True
- break
- # Clean up this ffmpeg process before reconnecting or exiting
- await _terminate_ffmpeg(process, stream_id)
- process = None
- if client_gone:
- break
- # Check if stream was explicitly stopped (e.g., by stop endpoint)
- if stream_id and stream_id not in _active_streams:
- logger.info("Stream %s removed from active streams, stopping reconnect", stream_id)
- break
- if stream_ended:
- reconnect_count += 1
- continue
- # Normal exit (shouldn't reach here, but be safe)
- break
- if reconnect_count > profile.rtsp_reconnect_max:
- logger.error(
- "RTSP max reconnects (%d) reached for %s (stream_id=%s)",
- profile.rtsp_reconnect_max,
- ip_address,
- stream_id,
- )
- except FileNotFoundError:
- logger.error("ffmpeg not found - camera streaming requires ffmpeg")
- yield (b"--frame\r\nContent-Type: text/plain\r\n\r\nError: ffmpeg not installed\r\n")
- except asyncio.CancelledError:
- logger.info("Camera stream task cancelled (stream_id=%s)", stream_id)
- except GeneratorExit:
- logger.info("Camera stream generator closed (stream_id=%s)", stream_id)
- except Exception as e:
- logger.exception("Camera stream error: %s", e)
- finally:
- # Remove from active streams and disconnect events
- if stream_id:
- _active_streams.pop(stream_id, None)
- _disconnect_events.pop(stream_id, None)
- _stream_last_frame_times.pop(stream_id, None)
- # Clean up frame buffer and timestamps
- if printer_id is not None:
- _last_frames.pop(printer_id, None)
- _last_frame_times.pop(printer_id, None)
- _stream_start_times.pop(printer_id, None)
- if process:
- await _terminate_ffmpeg(process, stream_id)
- logger.info("Camera stream stopped for %s (stream_id=%s)", ip_address, stream_id)
- # Shut down the TLS proxy
- proxy_server.close()
- await proxy_server.wait_closed()
- @router.post("/camera/stream-token")
- async def create_stream_token(
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Create a reusable token for camera stream/snapshot access.
- Returns a token valid for 60 minutes that can be appended as ?token=xxx
- to camera stream/snapshot URLs loaded via <img> tags.
- """
- return {"token": await create_camera_stream_token()}
- @router.get("/{printer_id}/camera/stream")
- async def camera_stream(
- printer_id: int,
- request: Request,
- fps: int = 10,
- db: AsyncSession = Depends(get_db),
- _: None = RequireCameraStreamTokenIfAuthEnabled,
- ):
- """Stream live video from printer camera as MJPEG.
- This endpoint returns a multipart MJPEG stream that can be used directly
- in an <img> tag or video player.
- Requires a stream token query param (?token=xxx) when auth is enabled.
- Uses external camera if configured, otherwise uses built-in camera:
- - External: MJPEG, RTSP, or HTTP snapshot
- - A1/P1: Chamber image protocol (port 6000)
- - X1/H2/P2: RTSP via ffmpeg (port 322)
- Args:
- printer_id: Printer ID
- fps: Target frames per second (default: 10, max: 30)
- """
- printer = await get_printer_or_404(printer_id, db)
- # Check for external camera first
- if printer.external_camera_enabled and printer.external_camera_url:
- import time
- from backend.app.services.external_camera import generate_mjpeg_stream
- # Limit external camera FPS to reduce browser load
- fps = min(max(fps, 1), 15)
- logger.info(
- "Using external camera (%s) for printer %s at %s fps", printer.external_camera_type, printer_id, fps
- )
- # Track stream start
- _stream_start_times[printer_id] = time.time()
- _active_external_streams.add(printer_id)
- async def external_stream_wrapper():
- """Wrap external stream to track start/stop and update frame times."""
- try:
- async for frame in generate_mjpeg_stream(
- printer.external_camera_url, printer.external_camera_type, fps
- ):
- # generate_mjpeg_stream already handles rate limiting;
- # just track frame times for stall detection
- _last_frame_times[printer_id] = time.time()
- yield frame
- finally:
- _active_external_streams.discard(printer_id)
- logger.info("External camera stream ended for printer %s", printer_id)
- return StreamingResponse(
- external_stream_wrapper(),
- media_type="multipart/x-mixed-replace; boundary=frame",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Pragma": "no-cache",
- "Expires": "0",
- },
- )
- # Validate FPS - A1/P1 models max out at ~5 FPS
- if is_chamber_image_model(printer.model):
- fps = min(max(fps, 1), 5)
- else:
- fps = min(max(fps, 1), 30)
- # Choose the appropriate stream generator based on model
- if is_chamber_image_model(printer.model):
- stream_generator = generate_chamber_mjpeg_stream
- logger.info("Using chamber image protocol for %s", printer.model)
- else:
- stream_generator = generate_rtsp_mjpeg_stream
- logger.info("Using RTSP protocol for %s", printer.model)
- # Track stream start time. Set only if absent so the value reflects when
- # the SHARED upstream first started streaming, not when each new viewer
- # attached — otherwise /camera/status would report stream_uptime jumping
- # backward whenever a second viewer joins. The upstream generator's
- # finally clears this entry when the upstream actually ends.
- import time
- _stream_start_times.setdefault(printer_id, time.time())
- # Fan-out broadcaster (#1089): one upstream connection per printer, shared
- # across all viewers. Most Bambu printers only allow a single concurrent
- # camera connection, so opening the same printer in two tabs would
- # otherwise kick the first viewer off. The broadcaster owns the single
- # upstream and the per-viewer disconnect handling.
- #
- # Note: the upstream's fps is fixed by the first viewer who creates the
- # broadcaster. Concurrent viewers share that rate; new viewers after
- # teardown create a fresh broadcaster at their requested fps.
- fanout_key = f"printer-{printer_id}"
- upstream_stream_id = f"{printer_id}-fanout"
- def _factory(disconnect_event: asyncio.Event):
- # Re-bind locals into the closure so the async generator below sees
- # them — disconnect_event is owned by the broadcaster and signalled
- # when the last subscriber leaves (after the grace window).
- return stream_generator(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- fps=fps,
- stream_id=upstream_stream_id,
- disconnect_event=disconnect_event,
- printer_id=printer_id,
- )
- # Subscribe with a one-shot retry to close a tiny race: the grace-window
- # teardown can flip the broadcaster to `stopped=True` between the registry
- # lookup and our subscribe call. The retry forces the registry to mint a
- # fresh broadcaster (since the now-stopped one is replaced), and the second
- # subscribe is guaranteed to land on it before any teardown can fire.
- broadcaster: MjpegBroadcaster = await get_or_create_broadcaster(fanout_key, _factory)
- try:
- queue = await broadcaster.subscribe()
- except RuntimeError:
- broadcaster = await get_or_create_broadcaster(fanout_key, _factory)
- queue = await broadcaster.subscribe()
- logger.info(
- "Camera viewer attached to %s (subscribers=%d)",
- fanout_key,
- broadcaster.subscriber_count,
- )
- async def _is_disconnected() -> bool:
- try:
- return await request.is_disconnected()
- except Exception:
- # Older starlette/uvicorn can raise during teardown — treat that
- # as "client gone" so the subscriber cleanly unsubscribes.
- return True
- def _log_detach(remaining: int) -> None:
- logger.info("Camera viewer detached from %s (subscribers=%d)", fanout_key, remaining)
- async def _generate():
- async for chunk in iter_subscriber(
- broadcaster,
- queue,
- is_disconnected=_is_disconnected,
- on_unsubscribe=_log_detach,
- ):
- yield chunk
- return StreamingResponse(
- _generate(),
- media_type="multipart/x-mixed-replace; boundary=frame",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Pragma": "no-cache",
- "Expires": "0",
- },
- )
- @router.api_route("/{printer_id}/camera/stop", methods=["GET", "POST"])
- async def stop_camera_stream(
- printer_id: int,
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Stop all active camera streams for a printer.
- This can be called by the frontend when the camera window is closed.
- Accepts both GET and POST (POST for sendBeacon compatibility).
- """
- stopped = 0
- # Tear down the fan-out broadcaster first (#1089). This cleanly notifies
- # all subscribed viewers and asks the upstream generator to stop
- # reconnecting before we fall back to forcefully killing the process below.
- if await shutdown_broadcaster(f"printer-{printer_id}"):
- logger.info("Shut down camera fan-out broadcaster for printer %s", printer_id)
- # Stop ffmpeg/RTSP streams
- to_remove = []
- for stream_id, process in list(_active_streams.items()):
- if stream_id.startswith(f"{printer_id}-"):
- to_remove.append(stream_id)
- # Signal the generator to stop reconnecting BEFORE killing the process
- event = _disconnect_events.get(stream_id)
- if event:
- event.set()
- if process.returncode is None:
- try:
- process.terminate()
- try:
- await asyncio.wait_for(process.wait(), timeout=2.0)
- except TimeoutError:
- logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
- process.kill()
- await process.wait()
- stopped += 1
- logger.info("Terminated ffmpeg process for stream %s", stream_id)
- except ProcessLookupError:
- pass # Process already dead
- except OSError as e:
- logger.warning("Error stopping stream %s: %s", stream_id, e)
- _spawned_ffmpeg_pids.pop(process.pid, None)
- for stream_id in to_remove:
- _active_streams.pop(stream_id, None)
- _disconnect_events.pop(stream_id, None)
- _stream_last_frame_times.pop(stream_id, None)
- # Stop chamber image streams
- to_remove_chamber = []
- for stream_id, (_reader, writer) in list(_active_chamber_streams.items()):
- if stream_id.startswith(f"{printer_id}-"):
- to_remove_chamber.append(stream_id)
- # Signal the generator to stop
- event = _disconnect_events.get(stream_id)
- if event:
- event.set()
- try:
- writer.close()
- stopped += 1
- logger.info("Closed chamber image connection for stream %s", stream_id)
- except OSError as e:
- logger.warning("Error stopping chamber stream %s: %s", stream_id, e)
- for stream_id in to_remove_chamber:
- _active_chamber_streams.pop(stream_id, None)
- _disconnect_events.pop(stream_id, None)
- _stream_last_frame_times.pop(stream_id, None)
- logger.info("Stopped %s camera stream(s) for printer %s", stopped, printer_id)
- return {"stopped": stopped}
- @router.get("/{printer_id}/camera/snapshot")
- async def camera_snapshot(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- _: None = RequireCameraStreamTokenIfAuthEnabled,
- ):
- """Capture a single frame from the printer camera.
- Returns a JPEG image.
- Requires a stream token query param (?token=xxx) when auth is enabled.
- """
- import tempfile
- from pathlib import Path
- printer = await get_printer_or_404(printer_id, db)
- # Check for external camera first
- if printer.external_camera_enabled and printer.external_camera_url:
- from backend.app.services.external_camera import capture_frame
- frame_data = await capture_frame(
- printer.external_camera_url,
- printer.external_camera_type,
- timeout=15,
- snapshot_url=printer.external_camera_snapshot_url,
- )
- if not frame_data:
- raise HTTPException(
- status_code=503,
- detail="Failed to capture frame from external camera.",
- )
- return Response(
- content=frame_data,
- media_type="image/jpeg",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
- },
- )
- # Reuse the fan-out broadcaster's buffered frame when a viewer is already
- # watching — avoids opening a second concurrent RTSP socket on printers
- # that allow only one camera connection (e.g. X2D firmware 01.01.00.00;
- # see #1271). Buffered frame is <1s old while a viewer is connected.
- buffered = try_get_active_buffered_frame(printer_id)
- if buffered:
- return Response(
- content=buffered,
- media_type="image/jpeg",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
- },
- )
- # Create temporary file for the snapshot (0600 so only the app user can read it)
- fd, tmp_name = tempfile.mkstemp(suffix=".jpg")
- os.close(fd)
- temp_path = Path(tmp_name)
- temp_path.chmod(0o600)
- try:
- success = await capture_camera_frame(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- output_path=temp_path,
- timeout=15,
- )
- if not success:
- raise HTTPException(
- status_code=503,
- detail="Failed to capture camera frame. Ensure printer is on and camera is enabled.",
- )
- # Read and return the image
- with open(temp_path, "rb") as f:
- image_data = f.read()
- return Response(
- content=image_data,
- media_type="image/jpeg",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
- },
- )
- finally:
- # Clean up temp file
- if temp_path.exists():
- temp_path.unlink()
- @router.get("/{printer_id}/camera/test")
- async def test_camera(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Test camera connection for a printer.
- Returns success status and any error message.
- """
- printer = await get_printer_or_404(printer_id, db)
- result = await test_camera_connection(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- )
- return result
- @router.post("/{printer_id}/camera/diagnose")
- async def diagnose_camera_route(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Run staged diagnostics for a printer's camera path.
- Returns a structured result the frontend renders inline so users can
- self-diagnose "connection lost" before opening a ticket. See
- ``camera_diagnose`` for stage details and the live-stream shortcut.
- """
- import time
- from backend.app.services.camera_diagnose import diagnose_camera
- printer = await get_printer_or_404(printer_id, db)
- # Look up live-stream evidence so the diagnostic can short-circuit
- # instead of fighting a viewer for the printer's single camera slot.
- has_live = is_stream_active(printer_id)
- last_ts = _last_frame_times.get(printer_id) if has_live else None
- live_age = (time.time() - last_ts) if (has_live and last_ts) else None
- result = await diagnose_camera(
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- printer_id=printer_id,
- has_live_stream=has_live,
- live_frame_age_seconds=live_age,
- )
- return result.to_dict()
- @router.get("/{printer_id}/camera/status")
- async def camera_status(
- printer_id: int,
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Get the status of an active camera stream.
- Returns whether a stream is active and when the last frame was received.
- Used by the frontend to detect stalled streams and auto-reconnect.
- """
- import time
- # Check if there's an active stream for this printer
- has_active_stream = False
- # Check external camera streams
- if printer_id in _active_external_streams:
- has_active_stream = True
- # Check ffmpeg/RTSP streams
- if not has_active_stream:
- for stream_id in _active_streams:
- if stream_id.startswith(f"{printer_id}-"):
- process = _active_streams[stream_id]
- if process.returncode is None:
- has_active_stream = True
- break
- # Check chamber image streams
- if not has_active_stream:
- for stream_id in _active_chamber_streams:
- if stream_id.startswith(f"{printer_id}-"):
- has_active_stream = True
- break
- # Get timing information
- current_time = time.time()
- last_frame_time = _last_frame_times.get(printer_id)
- stream_start_time = _stream_start_times.get(printer_id)
- # Calculate seconds since last frame
- seconds_since_frame = None
- if last_frame_time is not None:
- seconds_since_frame = current_time - last_frame_time
- # Calculate stream uptime
- stream_uptime = None
- if stream_start_time is not None:
- stream_uptime = current_time - stream_start_time
- return {
- "active": has_active_stream,
- "has_frames": printer_id in _last_frames,
- "seconds_since_frame": seconds_since_frame,
- "stream_uptime": stream_uptime,
- # Consider stalled if no frame for more than 10 seconds after stream started
- "stalled": (
- has_active_stream
- and stream_uptime is not None
- and stream_uptime > 5 # Give 5 seconds for stream to start
- and (seconds_since_frame is None or seconds_since_frame > 10)
- ),
- }
- @router.post("/{printer_id}/camera/external/test")
- async def test_external_camera(
- printer_id: int,
- url: str,
- camera_type: str,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Test external camera connection.
- Args:
- printer_id: Printer ID (for authorization)
- url: Camera URL or USB device path to test
- camera_type: Camera type ("mjpeg", "rtsp", "snapshot", "usb")
- Returns:
- Dict with {success: bool, error?: str, resolution?: str}
- """
- # Verify printer exists (for authorization)
- await get_printer_or_404(printer_id, db)
- from backend.app.services.external_camera import test_connection
- return await test_connection(url, camera_type)
- @router.get("/{printer_id}/camera/check-plate")
- async def check_plate_empty(
- printer_id: int,
- plate_type: str | None = None,
- use_external: bool | None = None,
- include_debug_image: bool = False,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Check if the build plate is empty using camera vision.
- Uses calibration-based difference detection - compares current frame
- to a reference image of the empty plate.
- IMPORTANT: Chamber light must be ON for reliable detection.
- Args:
- printer_id: Printer ID
- plate_type: Type of build plate (e.g., "High Temp Plate") for calibration lookup
- use_external: If True, prefer external camera over built-in. When omitted
- (None), defaults to the printer's external_camera_enabled setting —
- mirroring the runtime auto-check at print start (main.py). Without
- this default the UI's manual check would always use the built-in
- camera, mismatching the reference saved during calibration (#1359).
- include_debug_image: If True, return URL to annotated debug image
- Returns:
- Dict with detection results:
- - is_empty: bool - Whether plate appears empty
- - confidence: float - Confidence level (0.0 to 1.0)
- - difference_percent: float - How different from calibration reference
- - message: str - Human-readable result message
- - needs_calibration: bool - True if calibration is required
- - light_warning: bool - True if chamber light is off
- """
- from backend.app.services.plate_detection import (
- check_plate_empty as do_check,
- is_plate_detection_available,
- )
- from backend.app.services.printer_manager import printer_manager
- # Check printer exists first (before OpenCV check)
- printer = await get_printer_or_404(printer_id, db)
- if use_external is None:
- use_external = bool(
- printer.external_camera_enabled and printer.external_camera_url and printer.external_camera_type
- )
- if not is_plate_detection_available():
- raise HTTPException(
- status_code=503,
- detail="Plate detection not available. Install opencv-python-headless to enable.",
- )
- # Check chamber light status
- light_warning = False
- state = printer_manager.get_status(printer_id)
- if state and not state.chamber_light:
- light_warning = True
- from backend.app.services.plate_detection import PlateDetector
- # Build ROI tuple from printer settings if available
- roi = None
- if all(
- [
- printer.plate_detection_roi_x is not None,
- printer.plate_detection_roi_y is not None,
- printer.plate_detection_roi_w is not None,
- printer.plate_detection_roi_h is not None,
- ]
- ):
- roi = (
- printer.plate_detection_roi_x,
- printer.plate_detection_roi_y,
- printer.plate_detection_roi_w,
- printer.plate_detection_roi_h,
- )
- result = await do_check(
- printer_id=printer.id,
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- plate_type=plate_type,
- include_debug_image=include_debug_image,
- external_camera_url=printer.external_camera_url if printer.external_camera_enabled else None,
- external_camera_type=printer.external_camera_type if printer.external_camera_enabled else None,
- use_external=use_external,
- roi=roi,
- external_camera_snapshot_url=printer.external_camera_snapshot_url if printer.external_camera_enabled else None,
- )
- # Get reference count for the response
- detector = PlateDetector()
- ref_count = detector.get_calibration_count(printer.id)
- response = result.to_dict()
- response["light_warning"] = light_warning
- response["reference_count"] = ref_count
- response["max_references"] = detector.MAX_REFERENCES
- # Include current ROI in response
- if roi:
- response["roi"] = {"x": roi[0], "y": roi[1], "w": roi[2], "h": roi[3]}
- else:
- # Return default ROI
- response["roi"] = {"x": 0.15, "y": 0.35, "w": 0.70, "h": 0.55}
- # If debug image requested and available, encode as base64 data URL
- if include_debug_image and result.debug_image:
- import base64
- b64_image = base64.b64encode(result.debug_image).decode("utf-8")
- response["debug_image_url"] = f"data:image/jpeg;base64,{b64_image}"
- return response
- @router.post("/{printer_id}/camera/plate-detection/calibrate")
- async def calibrate_plate_detection(
- printer_id: int,
- label: str | None = None,
- use_external: bool | None = None,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Calibrate plate detection by capturing a reference image of the empty plate.
- The plate MUST be empty when calling this endpoint. The captured image
- will be used as the reference for future detection comparisons.
- Supports up to 5 reference images per printer. When adding a 6th, the oldest
- is automatically removed.
- IMPORTANT: Chamber light should be ON for calibration.
- Args:
- printer_id: Printer ID
- label: Optional label for this reference (e.g., "High Temp Plate", "Wham Bam")
- use_external: If True, prefer external camera over built-in. When omitted
- (None), defaults to the printer's external_camera_enabled setting so
- calibration captures from the same source the runtime auto-check
- uses at print start (#1359).
- Returns:
- Dict with:
- - success: bool - Whether calibration succeeded
- - message: str - Status message
- - index: int - The reference slot used (0-4)
- """
- from backend.app.services.plate_detection import (
- calibrate_plate,
- is_plate_detection_available,
- )
- from backend.app.services.printer_manager import printer_manager
- # Check printer exists first (before OpenCV check)
- printer = await get_printer_or_404(printer_id, db)
- if use_external is None:
- use_external = bool(
- printer.external_camera_enabled and printer.external_camera_url and printer.external_camera_type
- )
- if not is_plate_detection_available():
- raise HTTPException(
- status_code=503,
- detail="Plate detection not available. Install opencv-python-headless to enable.",
- )
- # Check chamber light - warn but don't block
- state = printer_manager.get_status(printer_id)
- light_warning = state and not state.chamber_light
- success, message, index = await calibrate_plate(
- printer_id=printer.id,
- ip_address=printer.ip_address,
- access_code=printer.access_code,
- model=printer.model,
- label=label,
- external_camera_url=printer.external_camera_url if printer.external_camera_enabled else None,
- external_camera_type=printer.external_camera_type if printer.external_camera_enabled else None,
- use_external=use_external,
- external_camera_snapshot_url=printer.external_camera_snapshot_url if printer.external_camera_enabled else None,
- )
- if light_warning and success:
- message += " (Warning: Chamber light was off)"
- return {"success": success, "message": message, "index": index}
- @router.delete("/{printer_id}/camera/plate-detection/calibrate")
- async def delete_plate_calibration(
- printer_id: int,
- plate_type: str | None = None,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Delete the plate detection calibration for a printer and plate type.
- Args:
- printer_id: Printer ID
- plate_type: Type of build plate (if None, deletes legacy non-plate-specific calibration)
- Returns:
- Dict with:
- - success: bool - Whether deletion succeeded
- - message: str - Status message
- """
- from backend.app.services.plate_detection import (
- delete_calibration,
- is_plate_detection_available,
- )
- # Verify printer exists first (before OpenCV check)
- await get_printer_or_404(printer_id, db)
- if not is_plate_detection_available():
- raise HTTPException(
- status_code=503,
- detail="Plate detection not available. Install opencv-python-headless to enable.",
- )
- deleted = delete_calibration(printer_id, plate_type)
- plate_msg = f" for '{plate_type}'" if plate_type else ""
- return {
- "success": deleted,
- "message": f"Calibration deleted{plate_msg}" if deleted else f"No calibration found{plate_msg}",
- }
- @router.get("/{printer_id}/camera/plate-detection/status")
- async def get_plate_detection_status(
- printer_id: int,
- plate_type: str | None = None,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Check plate detection status for a printer and plate type.
- Returns:
- Dict with:
- - available: bool - Whether OpenCV is installed
- - calibrated: bool - Whether printer has calibration for this plate type
- - plate_type: str - The plate type queried
- - chamber_light: bool - Whether chamber light is on
- - message: str - Status message
- """
- from backend.app.services.plate_detection import (
- get_calibration_status,
- is_plate_detection_available,
- )
- from backend.app.services.printer_manager import printer_manager
- # Verify printer exists first (before OpenCV check)
- await get_printer_or_404(printer_id, db)
- if not is_plate_detection_available():
- return {
- "available": False,
- "calibrated": False,
- "plate_type": plate_type,
- "chamber_light": False,
- "message": "OpenCV not installed",
- }
- # Get chamber light status
- state = printer_manager.get_status(printer_id)
- chamber_light = state.chamber_light if state else False
- status = get_calibration_status(printer_id, plate_type)
- status["chamber_light"] = chamber_light
- return status
- @router.get("/{printer_id}/camera/plate-detection/references")
- async def get_plate_references(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Get all calibration references for a printer with metadata.
- Returns list of references with index, label, timestamp, and thumbnail URL.
- """
- from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
- # Verify printer exists first (before OpenCV check)
- await get_printer_or_404(printer_id, db)
- if not is_plate_detection_available():
- raise HTTPException(503, "Plate detection not available")
- detector = PlateDetector()
- references = detector.get_references(printer_id)
- # Add thumbnail URLs
- for ref in references:
- ref["thumbnail_url"] = (
- f"/api/v1/printers/{printer_id}/camera/plate-detection/references/{ref['index']}/thumbnail"
- )
- return {
- "references": references,
- "max_references": detector.MAX_REFERENCES,
- }
- @router.get("/{printer_id}/camera/plate-detection/references/{index}/thumbnail")
- async def get_reference_thumbnail(
- printer_id: int,
- index: int,
- db: AsyncSession = Depends(get_db),
- _: None = RequireCameraStreamTokenIfAuthEnabled,
- ):
- """Get thumbnail image for a calibration reference.
- Requires a stream token query param (?token=xxx) when auth is enabled.
- """
- from fastapi.responses import Response
- from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
- # Verify printer exists first (before OpenCV check)
- await get_printer_or_404(printer_id, db)
- if not is_plate_detection_available():
- raise HTTPException(503, "Plate detection not available")
- detector = PlateDetector()
- thumbnail = detector.get_reference_thumbnail(printer_id, index)
- if thumbnail is None:
- raise HTTPException(404, "Reference not found")
- return Response(content=thumbnail, media_type="image/jpeg")
- @router.put("/{printer_id}/camera/plate-detection/references/{index}")
- async def update_reference_label(
- printer_id: int,
- index: int,
- label: str,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Update the label for a calibration reference."""
- from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
- # Verify printer exists first (before OpenCV check)
- await get_printer_or_404(printer_id, db)
- if not is_plate_detection_available():
- raise HTTPException(503, "Plate detection not available")
- detector = PlateDetector()
- success = detector.update_reference_label(printer_id, index, label)
- if not success:
- raise HTTPException(404, "Reference not found")
- return {"success": True, "index": index, "label": label}
- @router.delete("/{printer_id}/camera/plate-detection/references/{index}")
- async def delete_reference(
- printer_id: int,
- index: int,
- db: AsyncSession = Depends(get_db),
- _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
- ):
- """Delete a specific calibration reference."""
- from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
- # Verify printer exists first (before OpenCV check)
- await get_printer_or_404(printer_id, db)
- if not is_plate_detection_available():
- raise HTTPException(503, "Plate detection not available")
- detector = PlateDetector()
- success = detector.delete_reference(printer_id, index)
- if not success:
- raise HTTPException(404, "Reference not found")
- return {"success": True, "message": "Reference deleted"}
- def _scan_bambu_ffmpeg_pids() -> list[int]:
- """Scan /proc for ffmpeg processes with Bambu RTSP URLs.
- These are definitely ours — no other software connects to rtsp(s)://bblp:.
- This catches orphans that survive app restarts and are not in any tracking dict.
- """
- import os
- pids = []
- try:
- for entry in os.listdir("/proc"):
- if not entry.isdigit():
- continue
- try:
- with open(f"/proc/{entry}/cmdline", "rb") as f:
- cmdline = f.read()
- # Match both rtsp:// (via TLS proxy) and rtsps:// (direct)
- if b"ffmpeg" in cmdline and (b"rtsp://bblp:" in cmdline or b"rtsps://bblp:" in cmdline):
- pids.append(int(entry))
- except (OSError, PermissionError, ValueError):
- continue
- except OSError:
- pass
- return pids
- async def cleanup_orphaned_streams():
- """Clean up orphaned ffmpeg processes and stale stream entries.
- Called periodically from the background task loop in main.py.
- Three-layer cleanup:
- 1. /proc scan — finds ALL Bambu ffmpeg processes on the system, even those
- from previous app sessions. This is the nuclear safety net.
- 2. _spawned_ffmpeg_pids — tracks PIDs spawned this session, catches orphans
- that were removed from _active_streams but not killed.
- 3. _active_streams — kills stale entries with no recent frames.
- """
- import os
- import signal
- import time
- cleaned = 0
- now = time.time()
- # Collect PIDs that are legitimately in-use (active stream, process alive)
- active_pids = {proc.pid for proc in _active_streams.values() if proc.returncode is None}
- # Also exclude PIDs from one-shot snapshot captures (Obico detection, finish photos, etc.)
- from backend.app.services.camera import _active_capture_pids
- active_pids |= _active_capture_pids
- # 1. /proc scan — catch ALL orphaned Bambu ffmpeg processes on the system.
- # Any ffmpeg with rtsp(s)://bblp: that is NOT in an active stream is orphaned.
- for pid in _scan_bambu_ffmpeg_pids():
- if pid in active_pids:
- continue
- logger.info("Killing orphaned ffmpeg process found via /proc (pid=%d)", pid)
- try:
- os.kill(pid, signal.SIGKILL)
- except (ProcessLookupError, OSError):
- pass
- _spawned_ffmpeg_pids.pop(pid, None)
- cleaned += 1
- # 2. Clean up _spawned_ffmpeg_pids entries for dead processes
- for pid in list(_spawned_ffmpeg_pids):
- try:
- os.kill(pid, 0) # existence check
- except (ProcessLookupError, OSError):
- _spawned_ffmpeg_pids.pop(pid, None)
- # 3. Clean up _active_streams entries with dead processes
- dead_streams = [sid for sid, proc in _active_streams.items() if proc.returncode is not None]
- for sid in dead_streams:
- proc = _active_streams.pop(sid, None)
- if proc:
- _spawned_ffmpeg_pids.pop(proc.pid, None)
- cleaned += 1
- # 4. Kill stale active streams (alive but no frames for >30s)
- # Uses per-stream timestamps to avoid false "fresh" readings from newer streams
- for sid, proc in list(_active_streams.items()):
- if proc.returncode is not None:
- continue
- # Per-stream frame time is authoritative; fall back to per-printer
- stream_last_frame = _stream_last_frame_times.get(sid)
- if stream_last_frame is None:
- try:
- printer_id = int(sid.split("-", 1)[0])
- except (ValueError, IndexError):
- continue
- stream_last_frame = _last_frame_times.get(printer_id)
- spawn_time = _spawned_ffmpeg_pids.get(proc.pid, now)
- if stream_last_frame is None:
- stream_last_frame = spawn_time
- if now - spawn_time > 60 and now - stream_last_frame > 30:
- logger.info("Killing stale ffmpeg stream %s (no frames for %.0fs)", sid, now - stream_last_frame)
- # Signal the generator to stop reconnecting
- event = _disconnect_events.get(sid)
- if event:
- event.set()
- try:
- proc.kill()
- await proc.wait()
- except (ProcessLookupError, OSError):
- pass
- _active_streams.pop(sid, None)
- _disconnect_events.pop(sid, None)
- _stream_last_frame_times.pop(sid, None)
- _spawned_ffmpeg_pids.pop(proc.pid, None)
- cleaned += 1
- # 4. Clean stale chamber stream entries
- dead_chamber = [sid for sid, (_reader, writer) in _active_chamber_streams.items() if writer.is_closing()]
- for sid in dead_chamber:
- _active_chamber_streams.pop(sid, None)
- cleaned += 1
- if cleaned:
- logger.info("Cleaned up %d orphaned camera stream(s)", cleaned)
|