camera.py 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599
  1. """Camera streaming API endpoints for Bambu Lab printers."""
  2. import asyncio
  3. import logging
  4. import os
  5. import subprocess
  6. import sys
  7. from collections.abc import AsyncGenerator
  8. from fastapi import APIRouter, Depends, HTTPException, Request
  9. from fastapi.responses import Response, StreamingResponse
  10. from sqlalchemy import select
  11. from sqlalchemy.ext.asyncio import AsyncSession
  12. from backend.app.core.auth import (
  13. RequireCameraStreamTokenIfAuthEnabled,
  14. RequirePermissionIfAuthEnabled,
  15. create_camera_stream_token,
  16. )
  17. from backend.app.core.database import get_db
  18. from backend.app.core.permissions import Permission
  19. from backend.app.models.printer import Printer
  20. from backend.app.models.user import User
  21. from backend.app.services.camera import (
  22. capture_camera_frame,
  23. create_tls_proxy,
  24. generate_chamber_image_stream,
  25. get_camera_port,
  26. get_ffmpeg_path,
  27. is_chamber_image_model,
  28. read_next_chamber_frame,
  29. rtsp_socket_timeout_flag,
  30. test_camera_connection,
  31. )
  32. from backend.app.services.camera_fanout import (
  33. MjpegBroadcaster,
  34. get_or_create_broadcaster,
  35. iter_subscriber,
  36. shutdown_broadcaster,
  37. )
  38. from backend.app.services.camera_profiles import get_camera_profile
  39. logger = logging.getLogger(__name__)
  40. router = APIRouter(prefix="/printers", tags=["camera"])
  41. # Track active ffmpeg processes for cleanup
  42. _active_streams: dict[str, asyncio.subprocess.Process] = {}
  43. # Track active chamber image connections for cleanup
  44. _active_chamber_streams: dict[str, tuple] = {}
  45. # Store last frame for each printer (for photo capture from active stream)
  46. _last_frames: dict[int, bytes] = {}
  47. # Track last frame timestamp for each printer (for stall detection)
  48. _last_frame_times: dict[int, float] = {}
  49. # Track stream start times for each printer
  50. _stream_start_times: dict[int, float] = {}
  51. # Track active external camera streams by printer ID
  52. _active_external_streams: set[int] = set()
  53. # Track ALL spawned ffmpeg PIDs (persists even if _active_streams entries are removed)
  54. # Maps PID -> spawn timestamp — used by cleanup to find truly orphaned OS processes
  55. _spawned_ffmpeg_pids: dict[int, float] = {}
  56. # Track disconnect events per stream_id — allows stop endpoint and cleanup
  57. # to signal generators to stop reconnecting instead of just killing the process
  58. _disconnect_events: dict[str, asyncio.Event] = {}
  59. # Track last frame time per stream_id (not just per printer_id) for stale detection
  60. _stream_last_frame_times: dict[str, float] = {}
  61. def get_buffered_frame(printer_id: int) -> bytes | None:
  62. """Get the last buffered frame for a printer from an active stream.
  63. Returns the JPEG frame data if available, or None if no active stream.
  64. """
  65. return _last_frames.get(printer_id)
  66. def is_stream_active(printer_id: int) -> bool:
  67. """Return True iff a fan-out camera stream is currently registered for this printer.
  68. Snapshot callers (Obico polling, manual /camera/snapshot) MUST NOT open a
  69. second concurrent RTSP/chamber-image socket while a viewer is attached:
  70. most Bambu firmwares allow only one camera connection, so the competing
  71. socket either kicks the live viewer off or gets refused itself, and the
  72. resulting reconnect storm tears down the fan-out broadcaster (see #1348).
  73. Callers should consult this BEFORE trying to open a fresh socket and skip
  74. the capture cycle when it returns True — even if try_get_active_buffered_frame
  75. returns None (the stream may be running but the first frame hasn't landed
  76. in the buffer yet, or the upstream is mid-reconnect).
  77. """
  78. return any(k.startswith(f"{printer_id}-") for k in _active_streams) or any(
  79. k.startswith(f"{printer_id}-") for k in _active_chamber_streams
  80. )
  81. def try_get_active_buffered_frame(printer_id: int) -> bytes | None:
  82. """Return a buffered frame iff a stream is currently running for this printer.
  83. Snapshot callers (Obico polling, manual /camera/snapshot) tap the fan-out
  84. broadcaster's running upstream instead of opening a second concurrent
  85. RTSP/chamber-image socket. Critical for printers that allow only one
  86. camera connection (e.g. X2D firmware 01.01.00.00; see #1271).
  87. Returns None when no broadcaster is active for this printer, so callers
  88. fall through to their existing fresh-socket path unchanged.
  89. NB: returning None does NOT mean "safe to open a fresh socket" — it also
  90. fires when the stream is registered but no frame has been buffered yet
  91. (startup race, mid-reconnect). Callers that must avoid competing sockets
  92. should consult is_stream_active() first; see #1348.
  93. """
  94. if not is_stream_active(printer_id):
  95. return None
  96. return _last_frames.get(printer_id)
  97. async def get_printer_or_404(printer_id: int, db: AsyncSession) -> Printer:
  98. """Get printer by ID or raise 404."""
  99. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  100. printer = result.scalar_one_or_none()
  101. if not printer:
  102. raise HTTPException(status_code=404, detail="Printer not found")
  103. return printer
  104. async def generate_chamber_mjpeg_stream(
  105. ip_address: str,
  106. access_code: str,
  107. model: str | None,
  108. fps: int = 5,
  109. stream_id: str | None = None,
  110. disconnect_event: asyncio.Event | None = None,
  111. printer_id: int | None = None,
  112. ) -> AsyncGenerator[bytes, None]:
  113. """Generate MJPEG stream from A1/P1 printer using chamber image protocol.
  114. This connects to port 6000 and reads JPEG frames using the Bambu binary protocol.
  115. """
  116. logger.info("Starting chamber image stream for %s (stream_id=%s, model=%s)", ip_address, stream_id, model)
  117. # Register disconnect event so stop endpoint can signal us
  118. if stream_id and disconnect_event:
  119. _disconnect_events[stream_id] = disconnect_event
  120. connection = await generate_chamber_image_stream(ip_address, access_code, fps)
  121. if connection is None:
  122. logger.error("Failed to connect to chamber image stream for %s", ip_address)
  123. yield (
  124. b"--frame\r\n"
  125. b"Content-Type: text/plain\r\n\r\n"
  126. b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
  127. )
  128. return
  129. reader, writer = connection
  130. # Track active connection for cleanup
  131. if stream_id:
  132. _active_chamber_streams[stream_id] = (reader, writer)
  133. try:
  134. frame_interval = 1.0 / fps if fps > 0 else 0.2
  135. last_frame_time = 0.0
  136. while True:
  137. # Check if client disconnected
  138. if disconnect_event and disconnect_event.is_set():
  139. logger.info("Client disconnected, stopping chamber stream %s", stream_id)
  140. break
  141. # Read next frame
  142. frame = await read_next_chamber_frame(reader, timeout=30.0)
  143. if frame is None:
  144. logger.warning("Chamber image stream ended for %s", stream_id)
  145. break
  146. # Save frame to buffer for photo capture and track timestamp
  147. if printer_id is not None:
  148. import time
  149. _last_frames[printer_id] = frame
  150. _last_frame_times[printer_id] = time.time()
  151. # Rate limiting - skip frames if needed to maintain target FPS
  152. current_time = asyncio.get_event_loop().time()
  153. if current_time - last_frame_time < frame_interval:
  154. continue
  155. last_frame_time = current_time
  156. # Yield frame in MJPEG format
  157. yield (
  158. b"--frame\r\n"
  159. b"Content-Type: image/jpeg\r\n"
  160. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  161. b"\r\n" + frame + b"\r\n"
  162. )
  163. except asyncio.CancelledError:
  164. logger.info("Chamber image stream cancelled (stream_id=%s)", stream_id)
  165. except GeneratorExit:
  166. logger.info("Chamber image stream generator exit (stream_id=%s)", stream_id)
  167. except Exception as e:
  168. logger.exception("Chamber image stream error: %s", e)
  169. finally:
  170. # Remove from active streams and disconnect events
  171. if stream_id:
  172. _active_chamber_streams.pop(stream_id, None)
  173. _disconnect_events.pop(stream_id, None)
  174. _stream_last_frame_times.pop(stream_id, None)
  175. # Clean up frame buffer and timestamps
  176. if printer_id is not None:
  177. _last_frames.pop(printer_id, None)
  178. _last_frame_times.pop(printer_id, None)
  179. _stream_start_times.pop(printer_id, None)
  180. # Close the connection
  181. try:
  182. writer.close()
  183. await writer.wait_closed()
  184. except OSError:
  185. pass # Connection already closed or broken; cleanup is best-effort
  186. logger.info("Chamber image stream stopped for %s (stream_id=%s)", ip_address, stream_id)
  187. async def _terminate_ffmpeg(process: asyncio.subprocess.Process, stream_id: str | None = None) -> None:
  188. """Terminate an ffmpeg process gracefully, then kill if needed."""
  189. if process.returncode is not None:
  190. return # Already dead
  191. try:
  192. process.terminate()
  193. try:
  194. await asyncio.wait_for(process.wait(), timeout=2.0)
  195. except TimeoutError:
  196. logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
  197. process.kill()
  198. await process.wait()
  199. except ProcessLookupError:
  200. pass # Already dead
  201. except OSError as e:
  202. logger.warning("Error terminating ffmpeg: %s", e)
  203. _spawned_ffmpeg_pids.pop(process.pid, None)
  204. def _summarize_ffmpeg_stderr(text: str | None) -> str:
  205. """Strip ffmpeg's boilerplate banner and keep only actionable lines.
  206. ffmpeg prints ~20 lines of version/build/configuration/lib headers before
  207. any actual error message. Logging the full banner on every retry floods
  208. the log (hundreds of lines per failed stream). This filter drops the
  209. banner and caps output at the last 10 meaningful lines.
  210. """
  211. if not text:
  212. return ""
  213. banner_prefixes = (
  214. "ffmpeg version ",
  215. " built with ",
  216. " configuration:",
  217. " libavutil ",
  218. " libavcodec ",
  219. " libavformat ",
  220. " libavdevice ",
  221. " libavfilter ",
  222. " libswscale ",
  223. " libswresample ",
  224. " libpostproc ",
  225. )
  226. meaningful = [ln for ln in text.splitlines() if ln.strip() and not ln.startswith(banner_prefixes)]
  227. return "\n".join(meaningful[-10:])
  228. async def _read_ffmpeg_stderr(process: asyncio.subprocess.Process) -> str | None:
  229. """Read whatever ffmpeg has written to stderr so far (best-effort).
  230. ffmpeg's stderr must be drained *incrementally*. A stalled-but-still-alive
  231. ffmpeg — the typical P2S RTSP failure, where it connects but never produces
  232. a frame — never closes stderr, so a plain ``stderr.read()`` (read-to-EOF)
  233. blocks until the wait_for timeout and returns nothing, discarding the
  234. banner + stream-analysis lines ffmpeg already printed. Reading in bounded
  235. chunks returns the buffered output promptly whether or not ffmpeg has
  236. exited. Returns the content with ffmpeg's boilerplate banner stripped.
  237. """
  238. if not process or not process.stderr:
  239. return None
  240. chunks: list[bytes] = []
  241. total = 0
  242. cap = 65536
  243. try:
  244. while total < cap:
  245. chunk = await asyncio.wait_for(process.stderr.read(8192), timeout=2.0)
  246. if not chunk:
  247. break # EOF — ffmpeg has exited
  248. chunks.append(chunk)
  249. total += len(chunk)
  250. except Exception:
  251. # Timed out waiting for more data — ffmpeg is alive but quiet now.
  252. # Fall through and return whatever it already printed.
  253. pass
  254. if not chunks:
  255. return None
  256. return _summarize_ffmpeg_stderr(b"".join(chunks).decode(errors="replace")) or None
  257. async def generate_rtsp_mjpeg_stream(
  258. ip_address: str,
  259. access_code: str,
  260. model: str | None,
  261. fps: int = 10,
  262. stream_id: str | None = None,
  263. disconnect_event: asyncio.Event | None = None,
  264. printer_id: int | None = None,
  265. ) -> AsyncGenerator[bytes, None]:
  266. """Generate MJPEG stream from printer camera using ffmpeg/RTSP.
  267. This is for X1/H2/P2 models that support RTSP streaming.
  268. Auto-reconnects when the printer drops the RTSP session (common on P2S).
  269. Per-model knobs (probesize, analyzeduration, reconnect cadence) come from
  270. :func:`camera_profiles.get_camera_profile` so quirky firmwares can be
  271. handled by adding a profile entry rather than tuning a global constant.
  272. """
  273. ffmpeg = get_ffmpeg_path()
  274. if not ffmpeg:
  275. logger.error("ffmpeg not found - camera streaming requires ffmpeg")
  276. yield (b"--frame\r\nContent-Type: text/plain\r\n\r\nError: ffmpeg not installed\r\n")
  277. return
  278. profile = get_camera_profile(model)
  279. port = get_camera_port(model)
  280. # Use a local TLS proxy so Python's OpenSSL handles TLS instead of
  281. # ffmpeg's GnuTLS. This fixes P2S (and potentially other models)
  282. # dropping the RTSP session after a few seconds due to GnuTLS's
  283. # hardened Debian defaults rejecting TLS renegotiation.
  284. proxy_port, proxy_server = await create_tls_proxy(ip_address, port)
  285. camera_url = f"rtsp://bblp:{access_code}@127.0.0.1:{proxy_port}/streaming/live/1"
  286. # ffmpeg command to output MJPEG stream to stdout
  287. cmd = [
  288. ffmpeg,
  289. "-rtsp_transport",
  290. "tcp",
  291. "-rtsp_flags",
  292. "prefer_tcp",
  293. # Socket I/O timeout name varies by ffmpeg version (#1504); see
  294. # rtsp_socket_timeout_flag(). The 30s value is microseconds for
  295. # both names.
  296. f"-{rtsp_socket_timeout_flag()}",
  297. "30000000",
  298. "-buffer_size",
  299. "1024000", # 1MB buffer
  300. "-max_delay",
  301. "500000", # 0.5 seconds max delay
  302. "-probesize",
  303. str(profile.probesize),
  304. "-analyzeduration",
  305. str(profile.analyzeduration),
  306. "-fflags",
  307. "nobuffer", # Reduce internal buffering
  308. "-flags",
  309. "low_delay", # Minimize decode latency
  310. *profile.extra_ffmpeg_input_args,
  311. "-i",
  312. camera_url,
  313. "-f",
  314. "mjpeg",
  315. "-q:v",
  316. "5",
  317. "-r",
  318. str(fps),
  319. "-an", # No audio
  320. "-", # Output to stdout
  321. ]
  322. # Register disconnect event so stop endpoint can signal us
  323. if stream_id and disconnect_event:
  324. _disconnect_events[stream_id] = disconnect_event
  325. logger.info(
  326. "Starting RTSP camera stream for %s (stream_id=%s, model=%s, fps=%s, probesize=%s, analyzeduration=%s)",
  327. ip_address,
  328. stream_id,
  329. model,
  330. fps,
  331. profile.probesize,
  332. profile.analyzeduration,
  333. )
  334. # Log the full argv so a support bundle shows the actual ffmpeg flags
  335. # (probesize, analyzeduration, transport, ...). Only camera_url carries a
  336. # secret (the access code), so redact just that one element.
  337. _redacted_cmd = ["rtsp://<redacted>/streaming/live/1" if a == camera_url else a for a in cmd]
  338. logger.debug("ffmpeg command: %s", " ".join(_redacted_cmd))
  339. # On Windows, spawn ffmpeg in its own process group so that
  340. # terminate() doesn't broadcast CTRL_C_EVENT to uvicorn (#605).
  341. spawn_kwargs: dict = {}
  342. if sys.platform == "win32":
  343. spawn_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
  344. jpeg_start = b"\xff\xd8"
  345. jpeg_end = b"\xff\xd9"
  346. reconnect_count = 0
  347. process = None
  348. got_any_frames = False
  349. try:
  350. while reconnect_count <= profile.rtsp_reconnect_max:
  351. # Check for client disconnect before (re)connecting
  352. if disconnect_event and disconnect_event.is_set():
  353. break
  354. if reconnect_count > 0:
  355. logger.info(
  356. "RTSP reconnecting (%d/%d) for %s (stream_id=%s)",
  357. reconnect_count,
  358. profile.rtsp_reconnect_max,
  359. ip_address,
  360. stream_id,
  361. )
  362. await asyncio.sleep(profile.rtsp_reconnect_delay)
  363. if disconnect_event and disconnect_event.is_set():
  364. break
  365. # Spawn ffmpeg
  366. process = await asyncio.create_subprocess_exec(
  367. *cmd,
  368. stdout=asyncio.subprocess.PIPE,
  369. stderr=asyncio.subprocess.PIPE,
  370. **spawn_kwargs,
  371. )
  372. if stream_id:
  373. _active_streams[stream_id] = process
  374. import time as _time
  375. _spawned_ffmpeg_pids[process.pid] = _time.time()
  376. # Brief check for immediate startup failures
  377. await asyncio.sleep(0.1)
  378. if process.returncode is not None:
  379. stderr = await process.stderr.read()
  380. stderr_text = _summarize_ffmpeg_stderr(stderr.decode(errors="replace"))
  381. logger.error("ffmpeg failed immediately (attempt %d): %s", reconnect_count + 1, stderr_text)
  382. _spawned_ffmpeg_pids.pop(process.pid, None)
  383. if not got_any_frames and reconnect_count == 0:
  384. # First attempt failed immediately — camera is likely unreachable
  385. yield (
  386. b"--frame\r\n"
  387. b"Content-Type: text/plain\r\n\r\n"
  388. b"Error: Camera connection failed. Check printer is on and camera is enabled.\r\n"
  389. )
  390. return
  391. reconnect_count += 1
  392. continue
  393. # Read JPEG frames from ffmpeg stdout
  394. buffer = b""
  395. stream_ended = False
  396. client_gone = False
  397. while True:
  398. if disconnect_event and disconnect_event.is_set():
  399. client_gone = True
  400. break
  401. try:
  402. chunk = await asyncio.wait_for(process.stdout.read(8192), timeout=30.0)
  403. if not chunk:
  404. # ffmpeg exited — log stderr and break to reconnect
  405. stderr_text = await _read_ffmpeg_stderr(process)
  406. if stderr_text:
  407. logger.warning("ffmpeg stderr (stream_id=%s): %s", stream_id, stderr_text)
  408. logger.warning("RTSP stream ended for %s (stream_id=%s), will reconnect", ip_address, stream_id)
  409. stream_ended = True
  410. break
  411. buffer += chunk
  412. # Extract complete JPEG frames from buffer
  413. while True:
  414. start_idx = buffer.find(jpeg_start)
  415. if start_idx == -1:
  416. buffer = buffer[-2:] if len(buffer) > 2 else buffer
  417. break
  418. if start_idx > 0:
  419. buffer = buffer[start_idx:]
  420. end_idx = buffer.find(jpeg_end, 2)
  421. if end_idx == -1:
  422. break
  423. frame = buffer[: end_idx + 2]
  424. buffer = buffer[end_idx + 2 :]
  425. got_any_frames = True
  426. if printer_id is not None:
  427. import time
  428. _last_frames[printer_id] = frame
  429. _last_frame_times[printer_id] = time.time()
  430. if stream_id:
  431. _stream_last_frame_times[stream_id] = time.time()
  432. yield (
  433. b"--frame\r\n"
  434. b"Content-Type: image/jpeg\r\n"
  435. b"Content-Length: " + str(len(frame)).encode() + b"\r\n"
  436. b"\r\n" + frame + b"\r\n"
  437. )
  438. except TimeoutError:
  439. stderr_text = await _read_ffmpeg_stderr(process)
  440. if stderr_text:
  441. logger.warning("ffmpeg stderr on timeout: %s", stderr_text)
  442. logger.warning("RTSP read timeout for %s (stream_id=%s)", ip_address, stream_id)
  443. stream_ended = True
  444. break
  445. except asyncio.CancelledError:
  446. logger.info("Camera stream cancelled (stream_id=%s)", stream_id)
  447. client_gone = True
  448. break
  449. except GeneratorExit:
  450. logger.info("Camera stream generator exit (stream_id=%s)", stream_id)
  451. client_gone = True
  452. break
  453. # Clean up this ffmpeg process before reconnecting or exiting
  454. await _terminate_ffmpeg(process, stream_id)
  455. process = None
  456. if client_gone:
  457. break
  458. # Check if stream was explicitly stopped (e.g., by stop endpoint)
  459. if stream_id and stream_id not in _active_streams:
  460. logger.info("Stream %s removed from active streams, stopping reconnect", stream_id)
  461. break
  462. if stream_ended:
  463. reconnect_count += 1
  464. continue
  465. # Normal exit (shouldn't reach here, but be safe)
  466. break
  467. if reconnect_count > profile.rtsp_reconnect_max:
  468. logger.error(
  469. "RTSP max reconnects (%d) reached for %s (stream_id=%s)",
  470. profile.rtsp_reconnect_max,
  471. ip_address,
  472. stream_id,
  473. )
  474. except FileNotFoundError:
  475. logger.error("ffmpeg not found - camera streaming requires ffmpeg")
  476. yield (b"--frame\r\nContent-Type: text/plain\r\n\r\nError: ffmpeg not installed\r\n")
  477. except asyncio.CancelledError:
  478. logger.info("Camera stream task cancelled (stream_id=%s)", stream_id)
  479. except GeneratorExit:
  480. logger.info("Camera stream generator closed (stream_id=%s)", stream_id)
  481. except Exception as e:
  482. logger.exception("Camera stream error: %s", e)
  483. finally:
  484. # Remove from active streams and disconnect events
  485. if stream_id:
  486. _active_streams.pop(stream_id, None)
  487. _disconnect_events.pop(stream_id, None)
  488. _stream_last_frame_times.pop(stream_id, None)
  489. # Clean up frame buffer and timestamps
  490. if printer_id is not None:
  491. _last_frames.pop(printer_id, None)
  492. _last_frame_times.pop(printer_id, None)
  493. _stream_start_times.pop(printer_id, None)
  494. if process:
  495. await _terminate_ffmpeg(process, stream_id)
  496. logger.info("Camera stream stopped for %s (stream_id=%s)", ip_address, stream_id)
  497. # Shut down the TLS proxy
  498. proxy_server.close()
  499. await proxy_server.wait_closed()
  500. @router.post("/camera/stream-token")
  501. async def create_stream_token(
  502. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  503. ):
  504. """Create a reusable token for camera stream/snapshot access.
  505. Returns a token valid for 60 minutes that can be appended as ?token=xxx
  506. to camera stream/snapshot URLs loaded via <img> tags.
  507. """
  508. return {"token": await create_camera_stream_token()}
  509. @router.get("/{printer_id}/camera/stream")
  510. async def camera_stream(
  511. printer_id: int,
  512. request: Request,
  513. fps: int = 10,
  514. db: AsyncSession = Depends(get_db),
  515. _: None = RequireCameraStreamTokenIfAuthEnabled,
  516. ):
  517. """Stream live video from printer camera as MJPEG.
  518. This endpoint returns a multipart MJPEG stream that can be used directly
  519. in an <img> tag or video player.
  520. Requires a stream token query param (?token=xxx) when auth is enabled.
  521. Uses external camera if configured, otherwise uses built-in camera:
  522. - External: MJPEG, RTSP, or HTTP snapshot
  523. - A1/P1: Chamber image protocol (port 6000)
  524. - X1/H2/P2: RTSP via ffmpeg (port 322)
  525. Args:
  526. printer_id: Printer ID
  527. fps: Target frames per second (default: 10, max: 30)
  528. """
  529. printer = await get_printer_or_404(printer_id, db)
  530. # Check for external camera first
  531. if printer.external_camera_enabled and printer.external_camera_url:
  532. import time
  533. from backend.app.services.external_camera import generate_mjpeg_stream
  534. # Limit external camera FPS to reduce browser load
  535. fps = min(max(fps, 1), 15)
  536. logger.info(
  537. "Using external camera (%s) for printer %s at %s fps", printer.external_camera_type, printer_id, fps
  538. )
  539. # Track stream start
  540. _stream_start_times[printer_id] = time.time()
  541. _active_external_streams.add(printer_id)
  542. async def external_stream_wrapper():
  543. """Wrap external stream to track start/stop and update frame times."""
  544. try:
  545. async for frame in generate_mjpeg_stream(
  546. printer.external_camera_url, printer.external_camera_type, fps
  547. ):
  548. # generate_mjpeg_stream already handles rate limiting;
  549. # just track frame times for stall detection
  550. _last_frame_times[printer_id] = time.time()
  551. yield frame
  552. finally:
  553. _active_external_streams.discard(printer_id)
  554. logger.info("External camera stream ended for printer %s", printer_id)
  555. return StreamingResponse(
  556. external_stream_wrapper(),
  557. media_type="multipart/x-mixed-replace; boundary=frame",
  558. headers={
  559. "Cache-Control": "no-cache, no-store, must-revalidate",
  560. "Pragma": "no-cache",
  561. "Expires": "0",
  562. },
  563. )
  564. # Validate FPS - A1/P1 models max out at ~5 FPS
  565. if is_chamber_image_model(printer.model):
  566. fps = min(max(fps, 1), 5)
  567. else:
  568. fps = min(max(fps, 1), 30)
  569. # Choose the appropriate stream generator based on model
  570. if is_chamber_image_model(printer.model):
  571. stream_generator = generate_chamber_mjpeg_stream
  572. logger.info("Using chamber image protocol for %s", printer.model)
  573. else:
  574. stream_generator = generate_rtsp_mjpeg_stream
  575. logger.info("Using RTSP protocol for %s", printer.model)
  576. # Track stream start time. Set only if absent so the value reflects when
  577. # the SHARED upstream first started streaming, not when each new viewer
  578. # attached — otherwise /camera/status would report stream_uptime jumping
  579. # backward whenever a second viewer joins. The upstream generator's
  580. # finally clears this entry when the upstream actually ends.
  581. import time
  582. _stream_start_times.setdefault(printer_id, time.time())
  583. # Fan-out broadcaster (#1089): one upstream connection per printer, shared
  584. # across all viewers. Most Bambu printers only allow a single concurrent
  585. # camera connection, so opening the same printer in two tabs would
  586. # otherwise kick the first viewer off. The broadcaster owns the single
  587. # upstream and the per-viewer disconnect handling.
  588. #
  589. # Note: the upstream's fps is fixed by the first viewer who creates the
  590. # broadcaster. Concurrent viewers share that rate; new viewers after
  591. # teardown create a fresh broadcaster at their requested fps.
  592. fanout_key = f"printer-{printer_id}"
  593. upstream_stream_id = f"{printer_id}-fanout"
  594. def _factory(disconnect_event: asyncio.Event):
  595. # Re-bind locals into the closure so the async generator below sees
  596. # them — disconnect_event is owned by the broadcaster and signalled
  597. # when the last subscriber leaves (after the grace window).
  598. return stream_generator(
  599. ip_address=printer.ip_address,
  600. access_code=printer.access_code,
  601. model=printer.model,
  602. fps=fps,
  603. stream_id=upstream_stream_id,
  604. disconnect_event=disconnect_event,
  605. printer_id=printer_id,
  606. )
  607. # Subscribe with a one-shot retry to close a tiny race: the grace-window
  608. # teardown can flip the broadcaster to `stopped=True` between the registry
  609. # lookup and our subscribe call. The retry forces the registry to mint a
  610. # fresh broadcaster (since the now-stopped one is replaced), and the second
  611. # subscribe is guaranteed to land on it before any teardown can fire.
  612. broadcaster: MjpegBroadcaster = await get_or_create_broadcaster(fanout_key, _factory)
  613. try:
  614. queue = await broadcaster.subscribe()
  615. except RuntimeError:
  616. broadcaster = await get_or_create_broadcaster(fanout_key, _factory)
  617. queue = await broadcaster.subscribe()
  618. logger.info(
  619. "Camera viewer attached to %s (subscribers=%d)",
  620. fanout_key,
  621. broadcaster.subscriber_count,
  622. )
  623. async def _is_disconnected() -> bool:
  624. try:
  625. return await request.is_disconnected()
  626. except Exception:
  627. # Older starlette/uvicorn can raise during teardown — treat that
  628. # as "client gone" so the subscriber cleanly unsubscribes.
  629. return True
  630. def _log_detach(remaining: int) -> None:
  631. logger.info("Camera viewer detached from %s (subscribers=%d)", fanout_key, remaining)
  632. async def _generate():
  633. async for chunk in iter_subscriber(
  634. broadcaster,
  635. queue,
  636. is_disconnected=_is_disconnected,
  637. on_unsubscribe=_log_detach,
  638. ):
  639. yield chunk
  640. return StreamingResponse(
  641. _generate(),
  642. media_type="multipart/x-mixed-replace; boundary=frame",
  643. headers={
  644. "Cache-Control": "no-cache, no-store, must-revalidate",
  645. "Pragma": "no-cache",
  646. "Expires": "0",
  647. },
  648. )
  649. @router.api_route("/{printer_id}/camera/stop", methods=["GET", "POST"])
  650. async def stop_camera_stream(
  651. printer_id: int,
  652. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  653. ):
  654. """Stop all active camera streams for a printer.
  655. This can be called by the frontend when the camera window is closed.
  656. Accepts both GET and POST (POST for sendBeacon compatibility).
  657. """
  658. stopped = 0
  659. # Tear down the fan-out broadcaster first (#1089). This cleanly notifies
  660. # all subscribed viewers and asks the upstream generator to stop
  661. # reconnecting before we fall back to forcefully killing the process below.
  662. if await shutdown_broadcaster(f"printer-{printer_id}"):
  663. logger.info("Shut down camera fan-out broadcaster for printer %s", printer_id)
  664. # Stop ffmpeg/RTSP streams
  665. to_remove = []
  666. for stream_id, process in list(_active_streams.items()):
  667. if stream_id.startswith(f"{printer_id}-"):
  668. to_remove.append(stream_id)
  669. # Signal the generator to stop reconnecting BEFORE killing the process
  670. event = _disconnect_events.get(stream_id)
  671. if event:
  672. event.set()
  673. if process.returncode is None:
  674. try:
  675. process.terminate()
  676. try:
  677. await asyncio.wait_for(process.wait(), timeout=2.0)
  678. except TimeoutError:
  679. logger.warning("ffmpeg didn't terminate gracefully, killing (stream_id=%s)", stream_id)
  680. process.kill()
  681. await process.wait()
  682. stopped += 1
  683. logger.info("Terminated ffmpeg process for stream %s", stream_id)
  684. except ProcessLookupError:
  685. pass # Process already dead
  686. except OSError as e:
  687. logger.warning("Error stopping stream %s: %s", stream_id, e)
  688. _spawned_ffmpeg_pids.pop(process.pid, None)
  689. for stream_id in to_remove:
  690. _active_streams.pop(stream_id, None)
  691. _disconnect_events.pop(stream_id, None)
  692. _stream_last_frame_times.pop(stream_id, None)
  693. # Stop chamber image streams
  694. to_remove_chamber = []
  695. for stream_id, (_reader, writer) in list(_active_chamber_streams.items()):
  696. if stream_id.startswith(f"{printer_id}-"):
  697. to_remove_chamber.append(stream_id)
  698. # Signal the generator to stop
  699. event = _disconnect_events.get(stream_id)
  700. if event:
  701. event.set()
  702. try:
  703. writer.close()
  704. stopped += 1
  705. logger.info("Closed chamber image connection for stream %s", stream_id)
  706. except OSError as e:
  707. logger.warning("Error stopping chamber stream %s: %s", stream_id, e)
  708. for stream_id in to_remove_chamber:
  709. _active_chamber_streams.pop(stream_id, None)
  710. _disconnect_events.pop(stream_id, None)
  711. _stream_last_frame_times.pop(stream_id, None)
  712. logger.info("Stopped %s camera stream(s) for printer %s", stopped, printer_id)
  713. return {"stopped": stopped}
  714. @router.get("/{printer_id}/camera/snapshot")
  715. async def camera_snapshot(
  716. printer_id: int,
  717. db: AsyncSession = Depends(get_db),
  718. _: None = RequireCameraStreamTokenIfAuthEnabled,
  719. ):
  720. """Capture a single frame from the printer camera.
  721. Returns a JPEG image.
  722. Requires a stream token query param (?token=xxx) when auth is enabled.
  723. """
  724. import tempfile
  725. from pathlib import Path
  726. printer = await get_printer_or_404(printer_id, db)
  727. # Check for external camera first
  728. if printer.external_camera_enabled and printer.external_camera_url:
  729. from backend.app.services.external_camera import capture_frame
  730. frame_data = await capture_frame(
  731. printer.external_camera_url,
  732. printer.external_camera_type,
  733. timeout=15,
  734. snapshot_url=printer.external_camera_snapshot_url,
  735. )
  736. if not frame_data:
  737. raise HTTPException(
  738. status_code=503,
  739. detail="Failed to capture frame from external camera.",
  740. )
  741. return Response(
  742. content=frame_data,
  743. media_type="image/jpeg",
  744. headers={
  745. "Cache-Control": "no-cache, no-store, must-revalidate",
  746. "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
  747. },
  748. )
  749. # Reuse the fan-out broadcaster's buffered frame when a viewer is already
  750. # watching — avoids opening a second concurrent RTSP socket on printers
  751. # that allow only one camera connection (e.g. X2D firmware 01.01.00.00;
  752. # see #1271). Buffered frame is <1s old while a viewer is connected.
  753. buffered = try_get_active_buffered_frame(printer_id)
  754. if buffered:
  755. return Response(
  756. content=buffered,
  757. media_type="image/jpeg",
  758. headers={
  759. "Cache-Control": "no-cache, no-store, must-revalidate",
  760. "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
  761. },
  762. )
  763. # Create temporary file for the snapshot (0600 so only the app user can read it)
  764. fd, tmp_name = tempfile.mkstemp(suffix=".jpg")
  765. os.close(fd)
  766. temp_path = Path(tmp_name)
  767. temp_path.chmod(0o600)
  768. try:
  769. success = await capture_camera_frame(
  770. ip_address=printer.ip_address,
  771. access_code=printer.access_code,
  772. model=printer.model,
  773. output_path=temp_path,
  774. timeout=15,
  775. )
  776. if not success:
  777. raise HTTPException(
  778. status_code=503,
  779. detail="Failed to capture camera frame. Ensure printer is on and camera is enabled.",
  780. )
  781. # Read and return the image
  782. with open(temp_path, "rb") as f:
  783. image_data = f.read()
  784. return Response(
  785. content=image_data,
  786. media_type="image/jpeg",
  787. headers={
  788. "Cache-Control": "no-cache, no-store, must-revalidate",
  789. "Content-Disposition": f'inline; filename="snapshot_{printer_id}.jpg"',
  790. },
  791. )
  792. finally:
  793. # Clean up temp file
  794. if temp_path.exists():
  795. temp_path.unlink()
  796. @router.get("/{printer_id}/camera/test")
  797. async def test_camera(
  798. printer_id: int,
  799. db: AsyncSession = Depends(get_db),
  800. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  801. ):
  802. """Test camera connection for a printer.
  803. Returns success status and any error message.
  804. """
  805. printer = await get_printer_or_404(printer_id, db)
  806. result = await test_camera_connection(
  807. ip_address=printer.ip_address,
  808. access_code=printer.access_code,
  809. model=printer.model,
  810. )
  811. return result
  812. @router.post("/{printer_id}/camera/diagnose")
  813. async def diagnose_camera_route(
  814. printer_id: int,
  815. db: AsyncSession = Depends(get_db),
  816. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  817. ):
  818. """Run staged diagnostics for a printer's camera path.
  819. Returns a structured result the frontend renders inline so users can
  820. self-diagnose "connection lost" before opening a ticket. See
  821. ``camera_diagnose`` for stage details and the live-stream shortcut.
  822. """
  823. import time
  824. from backend.app.services.camera_diagnose import diagnose_camera
  825. printer = await get_printer_or_404(printer_id, db)
  826. # Look up live-stream evidence so the diagnostic can short-circuit
  827. # instead of fighting a viewer for the printer's single camera slot.
  828. has_live = is_stream_active(printer_id)
  829. last_ts = _last_frame_times.get(printer_id) if has_live else None
  830. live_age = (time.time() - last_ts) if (has_live and last_ts) else None
  831. result = await diagnose_camera(
  832. ip_address=printer.ip_address,
  833. access_code=printer.access_code,
  834. model=printer.model,
  835. printer_id=printer_id,
  836. has_live_stream=has_live,
  837. live_frame_age_seconds=live_age,
  838. )
  839. return result.to_dict()
  840. @router.get("/{printer_id}/camera/status")
  841. async def camera_status(
  842. printer_id: int,
  843. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  844. ):
  845. """Get the status of an active camera stream.
  846. Returns whether a stream is active and when the last frame was received.
  847. Used by the frontend to detect stalled streams and auto-reconnect.
  848. """
  849. import time
  850. # Check if there's an active stream for this printer
  851. has_active_stream = False
  852. # Check external camera streams
  853. if printer_id in _active_external_streams:
  854. has_active_stream = True
  855. # Check ffmpeg/RTSP streams
  856. if not has_active_stream:
  857. for stream_id in _active_streams:
  858. if stream_id.startswith(f"{printer_id}-"):
  859. process = _active_streams[stream_id]
  860. if process.returncode is None:
  861. has_active_stream = True
  862. break
  863. # Check chamber image streams
  864. if not has_active_stream:
  865. for stream_id in _active_chamber_streams:
  866. if stream_id.startswith(f"{printer_id}-"):
  867. has_active_stream = True
  868. break
  869. # Get timing information
  870. current_time = time.time()
  871. last_frame_time = _last_frame_times.get(printer_id)
  872. stream_start_time = _stream_start_times.get(printer_id)
  873. # Calculate seconds since last frame
  874. seconds_since_frame = None
  875. if last_frame_time is not None:
  876. seconds_since_frame = current_time - last_frame_time
  877. # Calculate stream uptime
  878. stream_uptime = None
  879. if stream_start_time is not None:
  880. stream_uptime = current_time - stream_start_time
  881. return {
  882. "active": has_active_stream,
  883. "has_frames": printer_id in _last_frames,
  884. "seconds_since_frame": seconds_since_frame,
  885. "stream_uptime": stream_uptime,
  886. # Consider stalled if no frame for more than 10 seconds after stream started
  887. "stalled": (
  888. has_active_stream
  889. and stream_uptime is not None
  890. and stream_uptime > 5 # Give 5 seconds for stream to start
  891. and (seconds_since_frame is None or seconds_since_frame > 10)
  892. ),
  893. }
  894. @router.post("/{printer_id}/camera/external/test")
  895. async def test_external_camera(
  896. printer_id: int,
  897. url: str,
  898. camera_type: str,
  899. db: AsyncSession = Depends(get_db),
  900. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  901. ):
  902. """Test external camera connection.
  903. Args:
  904. printer_id: Printer ID (for authorization)
  905. url: Camera URL or USB device path to test
  906. camera_type: Camera type ("mjpeg", "rtsp", "snapshot", "usb")
  907. Returns:
  908. Dict with {success: bool, error?: str, resolution?: str}
  909. """
  910. # Verify printer exists (for authorization)
  911. await get_printer_or_404(printer_id, db)
  912. from backend.app.services.external_camera import test_connection
  913. return await test_connection(url, camera_type)
  914. @router.get("/{printer_id}/camera/check-plate")
  915. async def check_plate_empty(
  916. printer_id: int,
  917. plate_type: str | None = None,
  918. use_external: bool | None = None,
  919. include_debug_image: bool = False,
  920. db: AsyncSession = Depends(get_db),
  921. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  922. ):
  923. """Check if the build plate is empty using camera vision.
  924. Uses calibration-based difference detection - compares current frame
  925. to a reference image of the empty plate.
  926. IMPORTANT: Chamber light must be ON for reliable detection.
  927. Args:
  928. printer_id: Printer ID
  929. plate_type: Type of build plate (e.g., "High Temp Plate") for calibration lookup
  930. use_external: If True, prefer external camera over built-in. When omitted
  931. (None), defaults to the printer's external_camera_enabled setting —
  932. mirroring the runtime auto-check at print start (main.py). Without
  933. this default the UI's manual check would always use the built-in
  934. camera, mismatching the reference saved during calibration (#1359).
  935. include_debug_image: If True, return URL to annotated debug image
  936. Returns:
  937. Dict with detection results:
  938. - is_empty: bool - Whether plate appears empty
  939. - confidence: float - Confidence level (0.0 to 1.0)
  940. - difference_percent: float - How different from calibration reference
  941. - message: str - Human-readable result message
  942. - needs_calibration: bool - True if calibration is required
  943. - light_warning: bool - True if chamber light is off
  944. """
  945. from backend.app.services.plate_detection import (
  946. check_plate_empty as do_check,
  947. is_plate_detection_available,
  948. )
  949. from backend.app.services.printer_manager import printer_manager
  950. # Check printer exists first (before OpenCV check)
  951. printer = await get_printer_or_404(printer_id, db)
  952. if use_external is None:
  953. use_external = bool(
  954. printer.external_camera_enabled and printer.external_camera_url and printer.external_camera_type
  955. )
  956. if not is_plate_detection_available():
  957. raise HTTPException(
  958. status_code=503,
  959. detail="Plate detection not available. Install opencv-python-headless to enable.",
  960. )
  961. # Check chamber light status
  962. light_warning = False
  963. state = printer_manager.get_status(printer_id)
  964. if state and not state.chamber_light:
  965. light_warning = True
  966. from backend.app.services.plate_detection import PlateDetector
  967. # Build ROI tuple from printer settings if available
  968. roi = None
  969. if all(
  970. [
  971. printer.plate_detection_roi_x is not None,
  972. printer.plate_detection_roi_y is not None,
  973. printer.plate_detection_roi_w is not None,
  974. printer.plate_detection_roi_h is not None,
  975. ]
  976. ):
  977. roi = (
  978. printer.plate_detection_roi_x,
  979. printer.plate_detection_roi_y,
  980. printer.plate_detection_roi_w,
  981. printer.plate_detection_roi_h,
  982. )
  983. result = await do_check(
  984. printer_id=printer.id,
  985. ip_address=printer.ip_address,
  986. access_code=printer.access_code,
  987. model=printer.model,
  988. plate_type=plate_type,
  989. include_debug_image=include_debug_image,
  990. external_camera_url=printer.external_camera_url if printer.external_camera_enabled else None,
  991. external_camera_type=printer.external_camera_type if printer.external_camera_enabled else None,
  992. use_external=use_external,
  993. roi=roi,
  994. external_camera_snapshot_url=printer.external_camera_snapshot_url if printer.external_camera_enabled else None,
  995. )
  996. # Get reference count for the response
  997. detector = PlateDetector()
  998. ref_count = detector.get_calibration_count(printer.id)
  999. response = result.to_dict()
  1000. response["light_warning"] = light_warning
  1001. response["reference_count"] = ref_count
  1002. response["max_references"] = detector.MAX_REFERENCES
  1003. # Include current ROI in response
  1004. if roi:
  1005. response["roi"] = {"x": roi[0], "y": roi[1], "w": roi[2], "h": roi[3]}
  1006. else:
  1007. # Return default ROI
  1008. response["roi"] = {"x": 0.15, "y": 0.35, "w": 0.70, "h": 0.55}
  1009. # If debug image requested and available, encode as base64 data URL
  1010. if include_debug_image and result.debug_image:
  1011. import base64
  1012. b64_image = base64.b64encode(result.debug_image).decode("utf-8")
  1013. response["debug_image_url"] = f"data:image/jpeg;base64,{b64_image}"
  1014. return response
  1015. @router.post("/{printer_id}/camera/plate-detection/calibrate")
  1016. async def calibrate_plate_detection(
  1017. printer_id: int,
  1018. label: str | None = None,
  1019. use_external: bool | None = None,
  1020. db: AsyncSession = Depends(get_db),
  1021. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  1022. ):
  1023. """Calibrate plate detection by capturing a reference image of the empty plate.
  1024. The plate MUST be empty when calling this endpoint. The captured image
  1025. will be used as the reference for future detection comparisons.
  1026. Supports up to 5 reference images per printer. When adding a 6th, the oldest
  1027. is automatically removed.
  1028. IMPORTANT: Chamber light should be ON for calibration.
  1029. Args:
  1030. printer_id: Printer ID
  1031. label: Optional label for this reference (e.g., "High Temp Plate", "Wham Bam")
  1032. use_external: If True, prefer external camera over built-in. When omitted
  1033. (None), defaults to the printer's external_camera_enabled setting so
  1034. calibration captures from the same source the runtime auto-check
  1035. uses at print start (#1359).
  1036. Returns:
  1037. Dict with:
  1038. - success: bool - Whether calibration succeeded
  1039. - message: str - Status message
  1040. - index: int - The reference slot used (0-4)
  1041. """
  1042. from backend.app.services.plate_detection import (
  1043. calibrate_plate,
  1044. is_plate_detection_available,
  1045. )
  1046. from backend.app.services.printer_manager import printer_manager
  1047. # Check printer exists first (before OpenCV check)
  1048. printer = await get_printer_or_404(printer_id, db)
  1049. if use_external is None:
  1050. use_external = bool(
  1051. printer.external_camera_enabled and printer.external_camera_url and printer.external_camera_type
  1052. )
  1053. if not is_plate_detection_available():
  1054. raise HTTPException(
  1055. status_code=503,
  1056. detail="Plate detection not available. Install opencv-python-headless to enable.",
  1057. )
  1058. # Check chamber light - warn but don't block
  1059. state = printer_manager.get_status(printer_id)
  1060. light_warning = state and not state.chamber_light
  1061. success, message, index = await calibrate_plate(
  1062. printer_id=printer.id,
  1063. ip_address=printer.ip_address,
  1064. access_code=printer.access_code,
  1065. model=printer.model,
  1066. label=label,
  1067. external_camera_url=printer.external_camera_url if printer.external_camera_enabled else None,
  1068. external_camera_type=printer.external_camera_type if printer.external_camera_enabled else None,
  1069. use_external=use_external,
  1070. external_camera_snapshot_url=printer.external_camera_snapshot_url if printer.external_camera_enabled else None,
  1071. )
  1072. if light_warning and success:
  1073. message += " (Warning: Chamber light was off)"
  1074. return {"success": success, "message": message, "index": index}
  1075. @router.delete("/{printer_id}/camera/plate-detection/calibrate")
  1076. async def delete_plate_calibration(
  1077. printer_id: int,
  1078. plate_type: str | None = None,
  1079. db: AsyncSession = Depends(get_db),
  1080. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  1081. ):
  1082. """Delete the plate detection calibration for a printer and plate type.
  1083. Args:
  1084. printer_id: Printer ID
  1085. plate_type: Type of build plate (if None, deletes legacy non-plate-specific calibration)
  1086. Returns:
  1087. Dict with:
  1088. - success: bool - Whether deletion succeeded
  1089. - message: str - Status message
  1090. """
  1091. from backend.app.services.plate_detection import (
  1092. delete_calibration,
  1093. is_plate_detection_available,
  1094. )
  1095. # Verify printer exists first (before OpenCV check)
  1096. await get_printer_or_404(printer_id, db)
  1097. if not is_plate_detection_available():
  1098. raise HTTPException(
  1099. status_code=503,
  1100. detail="Plate detection not available. Install opencv-python-headless to enable.",
  1101. )
  1102. deleted = delete_calibration(printer_id, plate_type)
  1103. plate_msg = f" for '{plate_type}'" if plate_type else ""
  1104. return {
  1105. "success": deleted,
  1106. "message": f"Calibration deleted{plate_msg}" if deleted else f"No calibration found{plate_msg}",
  1107. }
  1108. @router.get("/{printer_id}/camera/plate-detection/status")
  1109. async def get_plate_detection_status(
  1110. printer_id: int,
  1111. plate_type: str | None = None,
  1112. db: AsyncSession = Depends(get_db),
  1113. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  1114. ):
  1115. """Check plate detection status for a printer and plate type.
  1116. Returns:
  1117. Dict with:
  1118. - available: bool - Whether OpenCV is installed
  1119. - calibrated: bool - Whether printer has calibration for this plate type
  1120. - plate_type: str - The plate type queried
  1121. - chamber_light: bool - Whether chamber light is on
  1122. - message: str - Status message
  1123. """
  1124. from backend.app.services.plate_detection import (
  1125. get_calibration_status,
  1126. is_plate_detection_available,
  1127. )
  1128. from backend.app.services.printer_manager import printer_manager
  1129. # Verify printer exists first (before OpenCV check)
  1130. await get_printer_or_404(printer_id, db)
  1131. if not is_plate_detection_available():
  1132. return {
  1133. "available": False,
  1134. "calibrated": False,
  1135. "plate_type": plate_type,
  1136. "chamber_light": False,
  1137. "message": "OpenCV not installed",
  1138. }
  1139. # Get chamber light status
  1140. state = printer_manager.get_status(printer_id)
  1141. chamber_light = state.chamber_light if state else False
  1142. status = get_calibration_status(printer_id, plate_type)
  1143. status["chamber_light"] = chamber_light
  1144. return status
  1145. @router.get("/{printer_id}/camera/plate-detection/references")
  1146. async def get_plate_references(
  1147. printer_id: int,
  1148. db: AsyncSession = Depends(get_db),
  1149. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  1150. ):
  1151. """Get all calibration references for a printer with metadata.
  1152. Returns list of references with index, label, timestamp, and thumbnail URL.
  1153. """
  1154. from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
  1155. # Verify printer exists first (before OpenCV check)
  1156. await get_printer_or_404(printer_id, db)
  1157. if not is_plate_detection_available():
  1158. raise HTTPException(503, "Plate detection not available")
  1159. detector = PlateDetector()
  1160. references = detector.get_references(printer_id)
  1161. # Add thumbnail URLs
  1162. for ref in references:
  1163. ref["thumbnail_url"] = (
  1164. f"/api/v1/printers/{printer_id}/camera/plate-detection/references/{ref['index']}/thumbnail"
  1165. )
  1166. return {
  1167. "references": references,
  1168. "max_references": detector.MAX_REFERENCES,
  1169. }
  1170. @router.get("/{printer_id}/camera/plate-detection/references/{index}/thumbnail")
  1171. async def get_reference_thumbnail(
  1172. printer_id: int,
  1173. index: int,
  1174. db: AsyncSession = Depends(get_db),
  1175. _: None = RequireCameraStreamTokenIfAuthEnabled,
  1176. ):
  1177. """Get thumbnail image for a calibration reference.
  1178. Requires a stream token query param (?token=xxx) when auth is enabled.
  1179. """
  1180. from fastapi.responses import Response
  1181. from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
  1182. # Verify printer exists first (before OpenCV check)
  1183. await get_printer_or_404(printer_id, db)
  1184. if not is_plate_detection_available():
  1185. raise HTTPException(503, "Plate detection not available")
  1186. detector = PlateDetector()
  1187. thumbnail = detector.get_reference_thumbnail(printer_id, index)
  1188. if thumbnail is None:
  1189. raise HTTPException(404, "Reference not found")
  1190. return Response(content=thumbnail, media_type="image/jpeg")
  1191. @router.put("/{printer_id}/camera/plate-detection/references/{index}")
  1192. async def update_reference_label(
  1193. printer_id: int,
  1194. index: int,
  1195. label: str,
  1196. db: AsyncSession = Depends(get_db),
  1197. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  1198. ):
  1199. """Update the label for a calibration reference."""
  1200. from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
  1201. # Verify printer exists first (before OpenCV check)
  1202. await get_printer_or_404(printer_id, db)
  1203. if not is_plate_detection_available():
  1204. raise HTTPException(503, "Plate detection not available")
  1205. detector = PlateDetector()
  1206. success = detector.update_reference_label(printer_id, index, label)
  1207. if not success:
  1208. raise HTTPException(404, "Reference not found")
  1209. return {"success": True, "index": index, "label": label}
  1210. @router.delete("/{printer_id}/camera/plate-detection/references/{index}")
  1211. async def delete_reference(
  1212. printer_id: int,
  1213. index: int,
  1214. db: AsyncSession = Depends(get_db),
  1215. _: User | None = RequirePermissionIfAuthEnabled(Permission.CAMERA_VIEW),
  1216. ):
  1217. """Delete a specific calibration reference."""
  1218. from backend.app.services.plate_detection import PlateDetector, is_plate_detection_available
  1219. # Verify printer exists first (before OpenCV check)
  1220. await get_printer_or_404(printer_id, db)
  1221. if not is_plate_detection_available():
  1222. raise HTTPException(503, "Plate detection not available")
  1223. detector = PlateDetector()
  1224. success = detector.delete_reference(printer_id, index)
  1225. if not success:
  1226. raise HTTPException(404, "Reference not found")
  1227. return {"success": True, "message": "Reference deleted"}
  1228. def _scan_bambu_ffmpeg_pids() -> list[int]:
  1229. """Scan /proc for ffmpeg processes with Bambu RTSP URLs.
  1230. These are definitely ours — no other software connects to rtsp(s)://bblp:.
  1231. This catches orphans that survive app restarts and are not in any tracking dict.
  1232. """
  1233. import os
  1234. pids = []
  1235. try:
  1236. for entry in os.listdir("/proc"):
  1237. if not entry.isdigit():
  1238. continue
  1239. try:
  1240. with open(f"/proc/{entry}/cmdline", "rb") as f:
  1241. cmdline = f.read()
  1242. # Match both rtsp:// (via TLS proxy) and rtsps:// (direct)
  1243. if b"ffmpeg" in cmdline and (b"rtsp://bblp:" in cmdline or b"rtsps://bblp:" in cmdline):
  1244. pids.append(int(entry))
  1245. except (OSError, PermissionError, ValueError):
  1246. continue
  1247. except OSError:
  1248. pass
  1249. return pids
  1250. async def cleanup_orphaned_streams():
  1251. """Clean up orphaned ffmpeg processes and stale stream entries.
  1252. Called periodically from the background task loop in main.py.
  1253. Three-layer cleanup:
  1254. 1. /proc scan — finds ALL Bambu ffmpeg processes on the system, even those
  1255. from previous app sessions. This is the nuclear safety net.
  1256. 2. _spawned_ffmpeg_pids — tracks PIDs spawned this session, catches orphans
  1257. that were removed from _active_streams but not killed.
  1258. 3. _active_streams — kills stale entries with no recent frames.
  1259. """
  1260. import os
  1261. import signal
  1262. import time
  1263. cleaned = 0
  1264. now = time.time()
  1265. # Collect PIDs that are legitimately in-use (active stream, process alive)
  1266. active_pids = {proc.pid for proc in _active_streams.values() if proc.returncode is None}
  1267. # Also exclude PIDs from one-shot snapshot captures (Obico detection, finish photos, etc.)
  1268. from backend.app.services.camera import _active_capture_pids
  1269. active_pids |= _active_capture_pids
  1270. # 1. /proc scan — catch ALL orphaned Bambu ffmpeg processes on the system.
  1271. # Any ffmpeg with rtsp(s)://bblp: that is NOT in an active stream is orphaned.
  1272. for pid in _scan_bambu_ffmpeg_pids():
  1273. if pid in active_pids:
  1274. continue
  1275. logger.info("Killing orphaned ffmpeg process found via /proc (pid=%d)", pid)
  1276. try:
  1277. os.kill(pid, signal.SIGKILL)
  1278. except (ProcessLookupError, OSError):
  1279. pass
  1280. _spawned_ffmpeg_pids.pop(pid, None)
  1281. cleaned += 1
  1282. # 2. Clean up _spawned_ffmpeg_pids entries for dead processes
  1283. for pid in list(_spawned_ffmpeg_pids):
  1284. try:
  1285. os.kill(pid, 0) # existence check
  1286. except (ProcessLookupError, OSError):
  1287. _spawned_ffmpeg_pids.pop(pid, None)
  1288. # 3. Clean up _active_streams entries with dead processes
  1289. dead_streams = [sid for sid, proc in _active_streams.items() if proc.returncode is not None]
  1290. for sid in dead_streams:
  1291. proc = _active_streams.pop(sid, None)
  1292. if proc:
  1293. _spawned_ffmpeg_pids.pop(proc.pid, None)
  1294. cleaned += 1
  1295. # 4. Kill stale active streams (alive but no frames for >30s)
  1296. # Uses per-stream timestamps to avoid false "fresh" readings from newer streams
  1297. for sid, proc in list(_active_streams.items()):
  1298. if proc.returncode is not None:
  1299. continue
  1300. # Per-stream frame time is authoritative; fall back to per-printer
  1301. stream_last_frame = _stream_last_frame_times.get(sid)
  1302. if stream_last_frame is None:
  1303. try:
  1304. printer_id = int(sid.split("-", 1)[0])
  1305. except (ValueError, IndexError):
  1306. continue
  1307. stream_last_frame = _last_frame_times.get(printer_id)
  1308. spawn_time = _spawned_ffmpeg_pids.get(proc.pid, now)
  1309. if stream_last_frame is None:
  1310. stream_last_frame = spawn_time
  1311. if now - spawn_time > 60 and now - stream_last_frame > 30:
  1312. logger.info("Killing stale ffmpeg stream %s (no frames for %.0fs)", sid, now - stream_last_frame)
  1313. # Signal the generator to stop reconnecting
  1314. event = _disconnect_events.get(sid)
  1315. if event:
  1316. event.set()
  1317. try:
  1318. proc.kill()
  1319. await proc.wait()
  1320. except (ProcessLookupError, OSError):
  1321. pass
  1322. _active_streams.pop(sid, None)
  1323. _disconnect_events.pop(sid, None)
  1324. _stream_last_frame_times.pop(sid, None)
  1325. _spawned_ffmpeg_pids.pop(proc.pid, None)
  1326. cleaned += 1
  1327. # 4. Clean stale chamber stream entries
  1328. dead_chamber = [sid for sid, (_reader, writer) in _active_chamber_streams.items() if writer.is_closing()]
  1329. for sid in dead_chamber:
  1330. _active_chamber_streams.pop(sid, None)
  1331. cleaned += 1
  1332. if cleaned:
  1333. logger.info("Cleaned up %d orphaned camera stream(s)", cleaned)