camera.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. """Camera capture service for Bambu Lab printers.
  2. Captures images from the printer's RTSPS camera stream using ffmpeg.
  3. """
  4. import asyncio
  5. import logging
  6. import subprocess
  7. from pathlib import Path
  8. from datetime import datetime
  9. import uuid
  10. from backend.app.core.config import settings
  11. logger = logging.getLogger(__name__)
  12. def get_camera_port(model: str | None) -> int:
  13. """Get the RTSPS port based on printer model.
  14. X1 and H2D series use port 322.
  15. P1 and A1 series use port 6000.
  16. """
  17. if model:
  18. model_upper = model.upper()
  19. if model_upper.startswith(("X1", "H2")):
  20. return 322
  21. # Default to 6000 for P1/A1 or unknown models
  22. return 6000
  23. def build_camera_url(ip_address: str, access_code: str, model: str | None) -> str:
  24. """Build the RTSPS URL for the printer camera."""
  25. port = get_camera_port(model)
  26. return f"rtsps://bblp:{access_code}@{ip_address}:{port}/streaming/live/1"
  27. async def capture_camera_frame(
  28. ip_address: str,
  29. access_code: str,
  30. model: str | None,
  31. output_path: Path,
  32. timeout: int = 30,
  33. ) -> bool:
  34. """Capture a single frame from the printer's camera stream.
  35. Args:
  36. ip_address: Printer IP address
  37. access_code: Printer access code
  38. model: Printer model (X1, H2D, P1, A1, etc.)
  39. output_path: Path where to save the captured image
  40. timeout: Timeout in seconds for the capture operation
  41. Returns:
  42. True if capture was successful, False otherwise
  43. """
  44. camera_url = build_camera_url(ip_address, access_code, model)
  45. # Ensure output directory exists
  46. output_path.parent.mkdir(parents=True, exist_ok=True)
  47. # ffmpeg command to capture a single frame from RTSPS stream
  48. # -rtsp_transport tcp: Use TCP for RTSP (more reliable)
  49. # -y: Overwrite output file
  50. # -frames:v 1: Capture only 1 frame
  51. # -q:v 2: High quality JPEG (1-31, lower is better)
  52. cmd = [
  53. "ffmpeg",
  54. "-y", # Overwrite output
  55. "-rtsp_transport", "tcp",
  56. "-i", camera_url,
  57. "-frames:v", "1",
  58. "-q:v", "2",
  59. str(output_path),
  60. ]
  61. logger.info(f"Capturing camera frame from {ip_address} (model: {model})")
  62. try:
  63. # Run ffmpeg asynchronously with timeout
  64. process = await asyncio.create_subprocess_exec(
  65. *cmd,
  66. stdout=asyncio.subprocess.PIPE,
  67. stderr=asyncio.subprocess.PIPE,
  68. )
  69. try:
  70. stdout, stderr = await asyncio.wait_for(
  71. process.communicate(),
  72. timeout=timeout
  73. )
  74. except asyncio.TimeoutError:
  75. process.kill()
  76. await process.wait()
  77. logger.error(f"Camera capture timed out after {timeout}s")
  78. return False
  79. if process.returncode != 0:
  80. stderr_text = stderr.decode() if stderr else "Unknown error"
  81. logger.error(f"ffmpeg failed with code {process.returncode}: {stderr_text}")
  82. return False
  83. if output_path.exists() and output_path.stat().st_size > 0:
  84. logger.info(f"Successfully captured camera frame: {output_path}")
  85. return True
  86. else:
  87. logger.error("Camera capture produced no output file")
  88. return False
  89. except FileNotFoundError:
  90. logger.error("ffmpeg not found. Please install ffmpeg to enable camera capture.")
  91. return False
  92. except Exception as e:
  93. logger.exception(f"Camera capture failed: {e}")
  94. return False
  95. async def capture_finish_photo(
  96. printer_id: int,
  97. ip_address: str,
  98. access_code: str,
  99. model: str | None,
  100. archive_dir: Path,
  101. ) -> str | None:
  102. """Capture a finish photo and save it to the archive's photos folder.
  103. Args:
  104. printer_id: ID of the printer
  105. ip_address: Printer IP address
  106. access_code: Printer access code
  107. model: Printer model
  108. archive_dir: Directory of the archive (where the 3MF is stored)
  109. Returns:
  110. Filename of the captured photo, or None if capture failed
  111. """
  112. # Create photos subdirectory
  113. photos_dir = archive_dir / "photos"
  114. photos_dir.mkdir(parents=True, exist_ok=True)
  115. # Generate filename with timestamp
  116. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  117. filename = f"finish_{timestamp}_{uuid.uuid4().hex[:8]}.jpg"
  118. output_path = photos_dir / filename
  119. success = await capture_camera_frame(
  120. ip_address=ip_address,
  121. access_code=access_code,
  122. model=model,
  123. output_path=output_path,
  124. timeout=30,
  125. )
  126. if success:
  127. logger.info(f"Finish photo saved: {filename}")
  128. return filename
  129. else:
  130. logger.warning(f"Failed to capture finish photo for printer {printer_id}")
  131. return None
  132. async def test_camera_connection(
  133. ip_address: str,
  134. access_code: str,
  135. model: str | None,
  136. ) -> dict:
  137. """Test if the camera stream is accessible.
  138. Returns dict with success status and any error message.
  139. """
  140. import tempfile
  141. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
  142. test_path = Path(f.name)
  143. try:
  144. success = await capture_camera_frame(
  145. ip_address=ip_address,
  146. access_code=access_code,
  147. model=model,
  148. output_path=test_path,
  149. timeout=15,
  150. )
  151. if success:
  152. return {"success": True, "message": "Camera connection successful"}
  153. else:
  154. return {"success": False, "error": "Failed to capture frame from camera"}
  155. finally:
  156. # Clean up test file
  157. if test_path.exists():
  158. test_path.unlink()