plate_detection.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. """Build plate empty detection using OpenCV.
  2. Analyzes camera frames to detect if there are objects on the build plate.
  3. Uses calibration-based difference detection - compares current frame to
  4. a reference image of the empty plate.
  5. """
  6. import logging
  7. from pathlib import Path
  8. logger = logging.getLogger(__name__)
  9. # Optional OpenCV import - feature disabled if not available
  10. try:
  11. import cv2
  12. import numpy as np
  13. OPENCV_AVAILABLE = True
  14. except ImportError:
  15. OPENCV_AVAILABLE = False
  16. logger.info("OpenCV not available - plate detection feature disabled")
  17. # Directory to store calibration reference images
  18. CALIBRATION_DIR = Path(__file__).parent.parent.parent.parent / "data" / "plate_calibration"
  19. class PlateDetectionResult:
  20. """Result of plate detection analysis."""
  21. def __init__(
  22. self,
  23. is_empty: bool,
  24. confidence: float,
  25. difference_percent: float,
  26. message: str,
  27. debug_image: bytes | None = None,
  28. needs_calibration: bool = False,
  29. ):
  30. self.is_empty = is_empty
  31. self.confidence = confidence # 0.0 to 1.0
  32. self.difference_percent = difference_percent # How different from reference
  33. self.message = message
  34. self.debug_image = debug_image # Optional annotated image for debugging
  35. self.needs_calibration = needs_calibration # True if no reference image exists
  36. def to_dict(self) -> dict:
  37. return {
  38. "is_empty": bool(self.is_empty),
  39. "confidence": float(round(self.confidence, 2)),
  40. "difference_percent": float(round(self.difference_percent, 2)),
  41. "message": self.message,
  42. "has_debug_image": self.debug_image is not None,
  43. "needs_calibration": bool(self.needs_calibration),
  44. }
  45. class PlateDetector:
  46. """Detects if the build plate is empty using calibration-based difference detection."""
  47. # Default region of interest (ROI) as percentage of image dimensions
  48. # These define where the build plate typically appears in the camera view
  49. # Format: (x_start%, y_start%, width%, height%)
  50. DEFAULT_ROI = (0.15, 0.35, 0.70, 0.55) # Center-lower portion of frame
  51. # Detection thresholds for difference detection
  52. # Using mean pixel difference (0-100% scale)
  53. # Small objects may only cause 1-2% mean difference
  54. DEFAULT_DIFFERENCE_THRESHOLD = 1.0
  55. DEFAULT_BLUR_SIZE = 21 # Gaussian blur kernel size (must be odd) - unused with edge detection
  56. def __init__(
  57. self,
  58. roi: tuple[float, float, float, float] | None = None,
  59. difference_threshold: float = DEFAULT_DIFFERENCE_THRESHOLD,
  60. blur_size: int = DEFAULT_BLUR_SIZE,
  61. ):
  62. """Initialize the plate detector.
  63. Args:
  64. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  65. difference_threshold: Percentage of pixels that must differ to trigger "not empty"
  66. blur_size: Gaussian blur kernel size for noise reduction
  67. """
  68. if not OPENCV_AVAILABLE:
  69. raise RuntimeError("OpenCV is not installed. Install with: pip install opencv-python-headless")
  70. self.roi = roi or self.DEFAULT_ROI
  71. self.difference_threshold = difference_threshold
  72. self.blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1 # Must be odd
  73. # Maximum number of reference images to store per printer
  74. MAX_REFERENCES = 5
  75. def _get_metadata_path(self, printer_id: int) -> Path:
  76. """Get the path to the metadata JSON file for a printer."""
  77. CALIBRATION_DIR.mkdir(parents=True, exist_ok=True)
  78. return CALIBRATION_DIR / f"printer_{printer_id}_metadata.json"
  79. def _load_metadata(self, printer_id: int) -> dict:
  80. """Load metadata for a printer's references."""
  81. import json
  82. meta_path = self._get_metadata_path(printer_id)
  83. if meta_path.exists():
  84. try:
  85. with open(meta_path) as f:
  86. return json.load(f)
  87. except Exception:
  88. pass
  89. return {"references": {}}
  90. def _save_metadata(self, printer_id: int, metadata: dict) -> None:
  91. """Save metadata for a printer's references."""
  92. import json
  93. meta_path = self._get_metadata_path(printer_id)
  94. with open(meta_path, "w") as f:
  95. json.dump(metadata, f, indent=2)
  96. def _get_reference_paths(self, printer_id: int) -> list[Path]:
  97. """Get all existing reference image paths for a printer."""
  98. CALIBRATION_DIR.mkdir(parents=True, exist_ok=True)
  99. paths = []
  100. for i in range(self.MAX_REFERENCES):
  101. path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
  102. if path.exists():
  103. paths.append(path)
  104. return paths
  105. def _get_next_reference_slot(self, printer_id: int) -> Path:
  106. """Get the path for the next reference image slot (cycles through slots)."""
  107. CALIBRATION_DIR.mkdir(parents=True, exist_ok=True)
  108. # Find first empty slot, or use oldest (slot 0) and shift others
  109. for i in range(self.MAX_REFERENCES):
  110. path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
  111. if not path.exists():
  112. return path
  113. # All slots full - return slot 0 (will be overwritten, but we rotate first)
  114. return CALIBRATION_DIR / f"printer_{printer_id}_ref_0.jpg"
  115. def _rotate_references(self, printer_id: int) -> None:
  116. """Rotate references: delete oldest (0), shift others down."""
  117. # Delete slot 0
  118. slot0 = CALIBRATION_DIR / f"printer_{printer_id}_ref_0.jpg"
  119. if slot0.exists():
  120. slot0.unlink()
  121. # Shift others down
  122. for i in range(1, self.MAX_REFERENCES):
  123. old_path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
  124. new_path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i - 1}.jpg"
  125. if old_path.exists():
  126. old_path.rename(new_path)
  127. # Also rotate metadata
  128. metadata = self._load_metadata(printer_id)
  129. refs = metadata.get("references", {})
  130. new_refs = {}
  131. for i in range(1, self.MAX_REFERENCES):
  132. if str(i) in refs:
  133. new_refs[str(i - 1)] = refs[str(i)]
  134. metadata["references"] = new_refs
  135. self._save_metadata(printer_id, metadata)
  136. def get_references(self, printer_id: int) -> list[dict]:
  137. """Get all references with metadata for a printer.
  138. Returns list of dicts with: index, label, timestamp, has_image
  139. """
  140. metadata = self._load_metadata(printer_id)
  141. refs = metadata.get("references", {})
  142. result = []
  143. for i in range(self.MAX_REFERENCES):
  144. path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
  145. if path.exists():
  146. ref_meta = refs.get(str(i), {})
  147. result.append(
  148. {
  149. "index": i,
  150. "label": ref_meta.get("label", ""),
  151. "timestamp": ref_meta.get("timestamp", ""),
  152. "has_image": True,
  153. }
  154. )
  155. return result
  156. def update_reference_label(self, printer_id: int, index: int, label: str) -> bool:
  157. """Update the label for a reference."""
  158. if index < 0 or index >= self.MAX_REFERENCES:
  159. return False
  160. path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{index}.jpg"
  161. if not path.exists():
  162. return False
  163. metadata = self._load_metadata(printer_id)
  164. if "references" not in metadata:
  165. metadata["references"] = {}
  166. if str(index) not in metadata["references"]:
  167. metadata["references"][str(index)] = {}
  168. metadata["references"][str(index)]["label"] = label
  169. self._save_metadata(printer_id, metadata)
  170. return True
  171. def delete_reference(self, printer_id: int, index: int) -> bool:
  172. """Delete a specific reference by index."""
  173. if index < 0 or index >= self.MAX_REFERENCES:
  174. return False
  175. path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{index}.jpg"
  176. if not path.exists():
  177. return False
  178. # Delete image
  179. path.unlink()
  180. # Remove from metadata
  181. metadata = self._load_metadata(printer_id)
  182. refs = metadata.get("references", {})
  183. if str(index) in refs:
  184. del refs[str(index)]
  185. metadata["references"] = refs
  186. self._save_metadata(printer_id, metadata)
  187. # Shift remaining references down to fill the gap
  188. for i in range(index + 1, self.MAX_REFERENCES):
  189. old_img = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
  190. new_img = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i - 1}.jpg"
  191. if old_img.exists():
  192. old_img.rename(new_img)
  193. # Also shift metadata
  194. if str(i) in refs:
  195. refs[str(i - 1)] = refs[str(i)]
  196. del refs[str(i)]
  197. metadata["references"] = refs
  198. self._save_metadata(printer_id, metadata)
  199. return True
  200. def get_reference_thumbnail(self, printer_id: int, index: int, max_size: int = 150) -> bytes | None:
  201. """Get a thumbnail of a reference image.
  202. Returns JPEG bytes or None if not found.
  203. """
  204. path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{index}.jpg"
  205. if not path.exists():
  206. return None
  207. try:
  208. img = cv2.imread(str(path))
  209. if img is None:
  210. return None
  211. # Calculate thumbnail size maintaining aspect ratio
  212. h, w = img.shape[:2]
  213. if w > h:
  214. new_w = max_size
  215. new_h = int(h * max_size / w)
  216. else:
  217. new_h = max_size
  218. new_w = int(w * max_size / h)
  219. thumb = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
  220. _, buffer = cv2.imencode(".jpg", thumb, [cv2.IMWRITE_JPEG_QUALITY, 80])
  221. return buffer.tobytes()
  222. except Exception as e:
  223. logger.error(f"Error creating thumbnail: {e}")
  224. return None
  225. def _extract_roi(self, frame: np.ndarray) -> tuple[np.ndarray, int, int, int, int]:
  226. """Extract the region of interest from a frame.
  227. Returns:
  228. Tuple of (roi_frame, x_start, y_start, roi_width, roi_height)
  229. """
  230. height, width = frame.shape[:2]
  231. x_start = int(width * self.roi[0])
  232. y_start = int(height * self.roi[1])
  233. roi_width = int(width * self.roi[2])
  234. roi_height = int(height * self.roi[3])
  235. roi_frame = frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  236. return roi_frame, x_start, y_start, roi_width, roi_height
  237. def _preprocess_for_comparison(self, frame: np.ndarray) -> np.ndarray:
  238. """Preprocess a frame for comparison.
  239. Uses heavy blur to create "blob" representation - smooths out texture
  240. and noise while preserving large objects. Then normalizes brightness
  241. to reduce lighting sensitivity.
  242. """
  243. gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  244. # Very heavy blur to smooth texture, keep only large shapes
  245. blurred = cv2.GaussianBlur(gray, (51, 51), 0)
  246. # Normalize to 0-255 range to reduce brightness sensitivity
  247. normalized = cv2.normalize(blurred, None, 0, 255, cv2.NORM_MINMAX)
  248. return normalized
  249. def calibrate(self, image_data: bytes, printer_id: int, label: str | None = None) -> tuple[bool, str, int]:
  250. """Calibrate by saving a reference image of the empty plate.
  251. Stores up to MAX_REFERENCES (5) images per printer. When all slots are full,
  252. the oldest reference is removed and others are shifted.
  253. Args:
  254. image_data: JPEG image data as bytes
  255. printer_id: Printer database ID
  256. label: Optional label for this reference (e.g., "High Temp Plate")
  257. Returns:
  258. Tuple of (success, message, index) where index is the slot used
  259. """
  260. from datetime import datetime
  261. try:
  262. # Decode image
  263. nparr = np.frombuffer(image_data, np.uint8)
  264. frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  265. if frame is None:
  266. return False, "Failed to decode image", -1
  267. # Get existing references count
  268. existing_refs = self._get_reference_paths(printer_id)
  269. num_existing = len(existing_refs)
  270. # If all slots are full, rotate (remove oldest)
  271. if num_existing >= self.MAX_REFERENCES:
  272. self._rotate_references(printer_id)
  273. num_existing = self.MAX_REFERENCES - 1
  274. # Save to next available slot
  275. slot_index = num_existing
  276. reference_path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{slot_index}.jpg"
  277. cv2.imwrite(str(reference_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
  278. # Save metadata
  279. metadata = self._load_metadata(printer_id)
  280. if "references" not in metadata:
  281. metadata["references"] = {}
  282. metadata["references"][str(slot_index)] = {
  283. "label": label or "",
  284. "timestamp": datetime.now().isoformat(),
  285. }
  286. self._save_metadata(printer_id, metadata)
  287. logger.info(
  288. f"Saved plate calibration reference {slot_index + 1}/{self.MAX_REFERENCES} for printer {printer_id}"
  289. )
  290. return True, f"Calibration saved ({slot_index + 1}/{self.MAX_REFERENCES} references)", slot_index
  291. except Exception as e:
  292. logger.exception("Error during plate calibration")
  293. return False, f"Calibration error: {e!s}", -1
  294. def get_calibration_count(self, printer_id: int) -> int:
  295. """Get the number of calibration references for a printer."""
  296. return len(self._get_reference_paths(printer_id))
  297. def has_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  298. """Check if a printer has any calibration reference images."""
  299. return len(self._get_reference_paths(printer_id)) > 0
  300. def delete_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  301. """Delete all calibration reference images for a printer."""
  302. paths = self._get_reference_paths(printer_id)
  303. if not paths:
  304. return False
  305. for path in paths:
  306. path.unlink()
  307. logger.info(f"Deleted {len(paths)} plate calibration reference(s) for printer {printer_id}")
  308. return True
  309. def analyze_frame(
  310. self, image_data: bytes, printer_id: int, plate_type: str | None = None, include_debug_image: bool = False
  311. ) -> PlateDetectionResult:
  312. """Analyze a camera frame to detect if the plate is empty.
  313. Compares the current frame to all calibration reference images and uses
  314. the best match (lowest difference) for the final result.
  315. Args:
  316. image_data: JPEG image data as bytes
  317. printer_id: Printer database ID (for reference lookup)
  318. plate_type: Unused - kept for API compatibility
  319. include_debug_image: If True, include annotated image in result
  320. Returns:
  321. PlateDetectionResult with analysis results
  322. """
  323. try:
  324. # Check for calibration
  325. reference_paths = self._get_reference_paths(printer_id)
  326. if not reference_paths:
  327. return PlateDetectionResult(
  328. is_empty=True, # Default to empty when not calibrated
  329. confidence=0.0,
  330. difference_percent=0.0,
  331. message="No calibration - please calibrate with empty plate first",
  332. needs_calibration=True,
  333. )
  334. # Decode current image
  335. nparr = np.frombuffer(image_data, np.uint8)
  336. current_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  337. if current_frame is None:
  338. return PlateDetectionResult(
  339. is_empty=True,
  340. confidence=0.0,
  341. difference_percent=0.0,
  342. message="Failed to decode current image",
  343. )
  344. # Extract ROI from current frame
  345. current_roi, x_start, y_start, roi_width, roi_height = self._extract_roi(current_frame)
  346. current_processed = self._preprocess_for_comparison(current_roi)
  347. # Compare against all references, find best match (lowest difference)
  348. best_difference_percent = float("inf")
  349. best_ref_idx = -1
  350. best_diff = None
  351. for idx, ref_path in enumerate(reference_paths):
  352. # Load reference image
  353. reference_frame = cv2.imread(str(ref_path), cv2.IMREAD_COLOR)
  354. if reference_frame is None:
  355. continue
  356. # Ensure same dimensions
  357. if current_frame.shape != reference_frame.shape:
  358. reference_frame = cv2.resize(reference_frame, (current_frame.shape[1], current_frame.shape[0]))
  359. # Extract ROI and preprocess
  360. reference_roi, _, _, _, _ = self._extract_roi(reference_frame)
  361. reference_processed = self._preprocess_for_comparison(reference_roi)
  362. # Calculate absolute difference
  363. diff = cv2.absdiff(current_processed, reference_processed)
  364. # Calculate mean difference as percentage
  365. mean_diff = np.mean(diff)
  366. difference_percent = (mean_diff / 255.0) * 100
  367. if difference_percent < best_difference_percent:
  368. best_difference_percent = difference_percent
  369. best_ref_idx = idx
  370. best_diff = diff
  371. if best_ref_idx == -1:
  372. return PlateDetectionResult(
  373. is_empty=True,
  374. confidence=0.0,
  375. difference_percent=0.0,
  376. message="Failed to load any reference images - please recalibrate",
  377. needs_calibration=True,
  378. )
  379. difference_percent = best_difference_percent
  380. # Determine if plate is empty (use best match)
  381. is_empty = difference_percent < self.difference_threshold
  382. # Calculate confidence
  383. if is_empty:
  384. # Higher confidence when very little difference
  385. confidence = 1.0 - min(1.0, difference_percent / self.difference_threshold)
  386. else:
  387. # Higher confidence when clearly different
  388. confidence = min(1.0, difference_percent / (self.difference_threshold * 2))
  389. # Generate message
  390. num_refs = len(reference_paths)
  391. if is_empty:
  392. message = (
  393. f"Plate appears empty (difference: {difference_percent:.1f}%, ref {best_ref_idx + 1}/{num_refs})"
  394. )
  395. else:
  396. message = f"Objects detected on plate (difference: {difference_percent:.1f}%, best ref {best_ref_idx + 1}/{num_refs})"
  397. # Generate debug image if requested
  398. debug_image = None
  399. if include_debug_image and best_diff is not None:
  400. debug_frame = current_frame.copy()
  401. # Draw ROI rectangle
  402. cv2.rectangle(
  403. debug_frame,
  404. (x_start, y_start),
  405. (x_start + roi_width, y_start + roi_height),
  406. (0, 255, 0),
  407. 2,
  408. )
  409. # Create colored difference overlay
  410. # Red = areas that are different from reference
  411. # Amplify diff for visibility (multiply by 3, cap at 255)
  412. diff_amplified = np.minimum(best_diff * 3, 255).astype(np.uint8)
  413. diff_colored = cv2.cvtColor(diff_amplified, cv2.COLOR_GRAY2BGR)
  414. diff_colored[:, :, 0] = 0 # Remove blue
  415. diff_colored[:, :, 1] = 0 # Remove green
  416. # Red channel has the diff
  417. # Overlay difference on ROI
  418. roi_overlay = debug_frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  419. cv2.addWeighted(diff_colored, 0.5, roi_overlay, 0.5, 0, roi_overlay)
  420. # Add status text
  421. status_text = "EMPTY" if is_empty else "OBJECTS DETECTED"
  422. color = (0, 255, 0) if is_empty else (0, 0, 255)
  423. cv2.putText(debug_frame, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
  424. cv2.putText(
  425. debug_frame,
  426. f"Diff: {difference_percent:.1f}% (ref {best_ref_idx + 1}/{num_refs})",
  427. (10, 60),
  428. cv2.FONT_HERSHEY_SIMPLEX,
  429. 0.7,
  430. color,
  431. 2,
  432. )
  433. cv2.putText(
  434. debug_frame,
  435. f"Confidence: {confidence:.0%}",
  436. (10, 90),
  437. cv2.FONT_HERSHEY_SIMPLEX,
  438. 0.7,
  439. color,
  440. 2,
  441. )
  442. # Encode debug image as JPEG
  443. _, buffer = cv2.imencode(".jpg", debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
  444. debug_image = buffer.tobytes()
  445. return PlateDetectionResult(
  446. is_empty=is_empty,
  447. confidence=confidence,
  448. difference_percent=difference_percent,
  449. message=message,
  450. debug_image=debug_image,
  451. )
  452. except Exception as e:
  453. logger.exception("Error analyzing frame for plate detection")
  454. return PlateDetectionResult(
  455. is_empty=True, # Default to empty on error (don't block prints)
  456. confidence=0.0,
  457. difference_percent=0.0,
  458. message=f"Analysis error: {e!s}",
  459. )
  460. async def capture_camera_image(
  461. printer_id: int,
  462. ip_address: str,
  463. access_code: str,
  464. model: str,
  465. external_camera_url: str | None = None,
  466. external_camera_type: str | None = None,
  467. use_external: bool = False,
  468. ) -> tuple[bytes | None, str]:
  469. """Capture an image from the printer camera.
  470. If there's an active camera stream, uses the buffered frame instead of
  471. creating a new connection (which would fail while stream is active).
  472. Returns:
  473. Tuple of (image_data, camera_source) or (None, error_message)
  474. """
  475. image_data: bytes | None = None
  476. camera_source = "unknown"
  477. # Try external camera first if requested and available
  478. if use_external and external_camera_url and external_camera_type:
  479. try:
  480. from backend.app.services.external_camera import capture_frame
  481. image_data = await capture_frame(external_camera_url, external_camera_type)
  482. if image_data:
  483. camera_source = "external"
  484. logger.debug(f"Captured frame from external camera for printer {printer_id}")
  485. except Exception as e:
  486. logger.warning(f"Failed to capture from external camera: {e}")
  487. # Fall back to built-in camera
  488. if image_data is None:
  489. # First, check if there's an active stream with a buffered frame
  490. # This avoids blocking when camera viewer is open
  491. try:
  492. from backend.app.api.routes.camera import get_buffered_frame
  493. buffered = get_buffered_frame(printer_id)
  494. if buffered:
  495. image_data = buffered
  496. camera_source = "built-in (buffered)"
  497. logger.debug(f"Using buffered frame from active stream for printer {printer_id}")
  498. except Exception as e:
  499. logger.debug(f"Could not get buffered frame: {e}")
  500. # If no buffered frame, try to capture a new one
  501. if image_data is None:
  502. import tempfile
  503. from backend.app.services.camera import capture_camera_frame
  504. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
  505. tmp_path = Path(tmp.name)
  506. try:
  507. success = await capture_camera_frame(ip_address, access_code, model, tmp_path, timeout=10)
  508. if success:
  509. with open(tmp_path, "rb") as f:
  510. image_data = f.read()
  511. camera_source = "built-in"
  512. logger.debug(f"Captured frame from built-in camera for printer {printer_id}")
  513. finally:
  514. try:
  515. tmp_path.unlink()
  516. except Exception:
  517. pass
  518. return image_data, camera_source
  519. async def check_plate_empty(
  520. printer_id: int,
  521. ip_address: str,
  522. access_code: str,
  523. model: str,
  524. plate_type: str | None = None,
  525. include_debug_image: bool = False,
  526. external_camera_url: str | None = None,
  527. external_camera_type: str | None = None,
  528. use_external: bool = False,
  529. roi: tuple[float, float, float, float] | None = None,
  530. ) -> PlateDetectionResult:
  531. """Check if the build plate is empty for a printer.
  532. Args:
  533. printer_id: Printer database ID
  534. ip_address: Printer IP address
  535. access_code: Printer access code
  536. model: Printer model string
  537. plate_type: Type of build plate for calibration lookup
  538. include_debug_image: If True, include annotated image in result
  539. external_camera_url: URL of external camera (if configured)
  540. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  541. use_external: If True, prefer external camera over built-in
  542. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  543. Returns:
  544. PlateDetectionResult with analysis results
  545. """
  546. if not OPENCV_AVAILABLE:
  547. return PlateDetectionResult(
  548. is_empty=True,
  549. confidence=0.0,
  550. difference_percent=0.0,
  551. message="OpenCV not available - plate detection disabled",
  552. )
  553. image_data, camera_source = await capture_camera_image(
  554. printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
  555. )
  556. if image_data is None:
  557. return PlateDetectionResult(
  558. is_empty=True, # Default to empty on error
  559. confidence=0.0,
  560. difference_percent=0.0,
  561. message="Failed to capture camera frame from any source",
  562. )
  563. # Analyze the captured frame
  564. detector = PlateDetector(roi=roi)
  565. result = detector.analyze_frame(image_data, printer_id, plate_type, include_debug_image)
  566. # Add camera source to message
  567. result.message = f"[{camera_source}] {result.message}"
  568. return result
  569. async def calibrate_plate(
  570. printer_id: int,
  571. ip_address: str,
  572. access_code: str,
  573. model: str,
  574. label: str | None = None,
  575. external_camera_url: str | None = None,
  576. external_camera_type: str | None = None,
  577. use_external: bool = False,
  578. ) -> tuple[bool, str, int]:
  579. """Calibrate plate detection by capturing a reference image of the empty plate.
  580. Args:
  581. printer_id: Printer database ID
  582. ip_address: Printer IP address
  583. access_code: Printer access code
  584. model: Printer model string
  585. label: Optional label for this reference (e.g., "High Temp Plate")
  586. external_camera_url: URL of external camera (if configured)
  587. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  588. use_external: If True, prefer external camera over built-in
  589. Returns:
  590. Tuple of (success, message, index)
  591. """
  592. if not OPENCV_AVAILABLE:
  593. return False, "OpenCV not available - plate detection disabled", -1
  594. image_data, camera_source = await capture_camera_image(
  595. printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
  596. )
  597. if image_data is None:
  598. return False, "Failed to capture camera frame for calibration", -1
  599. detector = PlateDetector()
  600. success, message, index = detector.calibrate(image_data, printer_id, label)
  601. if success:
  602. message = f"[{camera_source}] {message}"
  603. return success, message, index
  604. def get_calibration_status(printer_id: int, plate_type: str | None = None) -> dict:
  605. """Get calibration status for a printer.
  606. Returns:
  607. Dict with calibration info including reference count
  608. """
  609. if not OPENCV_AVAILABLE:
  610. return {
  611. "available": False,
  612. "calibrated": False,
  613. "reference_count": 0,
  614. "max_references": 5,
  615. "message": "OpenCV not available",
  616. }
  617. detector = PlateDetector()
  618. calibrated = detector.has_calibration(printer_id)
  619. ref_count = detector.get_calibration_count(printer_id)
  620. if calibrated:
  621. message = f"Calibrated with {ref_count}/{detector.MAX_REFERENCES} reference(s)"
  622. else:
  623. message = "Not calibrated - please calibrate with empty plate"
  624. return {
  625. "available": True,
  626. "calibrated": calibrated,
  627. "reference_count": ref_count,
  628. "max_references": detector.MAX_REFERENCES,
  629. "message": message,
  630. }
  631. def delete_calibration(printer_id: int, plate_type: str | None = None) -> bool:
  632. """Delete calibration for a printer and plate type."""
  633. if not OPENCV_AVAILABLE:
  634. return False
  635. detector = PlateDetector()
  636. return detector.delete_calibration(printer_id, plate_type)
  637. def is_plate_detection_available() -> bool:
  638. """Check if plate detection feature is available (OpenCV installed)."""
  639. return OPENCV_AVAILABLE