plate_detection.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. """Build plate empty detection using OpenCV.
  2. Analyzes camera frames to detect if there are objects on the build plate.
  3. Uses calibration-based difference detection - compares current frame to
  4. a reference image of the empty plate.
  5. """
  6. import logging
  7. from pathlib import Path
  8. logger = logging.getLogger(__name__)
  9. # Optional OpenCV import - feature disabled if not available
  10. try:
  11. import cv2
  12. import numpy as np
  13. OPENCV_AVAILABLE = True
  14. except ImportError:
  15. OPENCV_AVAILABLE = False
  16. logger.info("OpenCV not available - plate detection feature disabled")
  17. def _get_calibration_dir() -> Path:
  18. """Get the calibration directory from settings (ensures persistence in Docker)."""
  19. from backend.app.core.config import settings
  20. return settings.plate_calibration_dir
  21. class PlateDetectionResult:
  22. """Result of plate detection analysis."""
  23. def __init__(
  24. self,
  25. is_empty: bool,
  26. confidence: float,
  27. difference_percent: float,
  28. message: str,
  29. debug_image: bytes | None = None,
  30. needs_calibration: bool = False,
  31. ):
  32. self.is_empty = is_empty
  33. self.confidence = confidence # 0.0 to 1.0
  34. self.difference_percent = difference_percent # How different from reference
  35. self.message = message
  36. self.debug_image = debug_image # Optional annotated image for debugging
  37. self.needs_calibration = needs_calibration # True if no reference image exists
  38. def to_dict(self) -> dict:
  39. return {
  40. "is_empty": bool(self.is_empty),
  41. "confidence": float(round(self.confidence, 2)),
  42. "difference_percent": float(round(self.difference_percent, 2)),
  43. "message": self.message,
  44. "has_debug_image": self.debug_image is not None,
  45. "needs_calibration": bool(self.needs_calibration),
  46. }
  47. class PlateDetector:
  48. """Detects if the build plate is empty using calibration-based difference detection."""
  49. # Default region of interest (ROI) as percentage of image dimensions
  50. # These define where the build plate typically appears in the camera view
  51. # Format: (x_start%, y_start%, width%, height%)
  52. DEFAULT_ROI = (0.15, 0.35, 0.70, 0.55) # Center-lower portion of frame
  53. # Detection thresholds for difference detection
  54. # Using mean pixel difference (0-100% scale)
  55. # Small objects may only cause 1-2% mean difference
  56. DEFAULT_DIFFERENCE_THRESHOLD = 1.0
  57. DEFAULT_BLUR_SIZE = 21 # Gaussian blur kernel size (must be odd) - unused with edge detection
  58. def __init__(
  59. self,
  60. roi: tuple[float, float, float, float] | None = None,
  61. difference_threshold: float = DEFAULT_DIFFERENCE_THRESHOLD,
  62. blur_size: int = DEFAULT_BLUR_SIZE,
  63. ):
  64. """Initialize the plate detector.
  65. Args:
  66. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  67. difference_threshold: Percentage of pixels that must differ to trigger "not empty"
  68. blur_size: Gaussian blur kernel size for noise reduction
  69. """
  70. if not OPENCV_AVAILABLE:
  71. raise RuntimeError("OpenCV is not installed. Install with: pip install opencv-python-headless")
  72. self.roi = roi or self.DEFAULT_ROI
  73. self.difference_threshold = difference_threshold
  74. self.blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1 # Must be odd
  75. # Maximum number of reference images to store per printer
  76. MAX_REFERENCES = 5
  77. def _get_metadata_path(self, printer_id: int) -> Path:
  78. """Get the path to the metadata JSON file for a printer."""
  79. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  80. return _get_calibration_dir() / f"printer_{printer_id}_metadata.json"
  81. def _load_metadata(self, printer_id: int) -> dict:
  82. """Load metadata for a printer's references."""
  83. import json
  84. meta_path = self._get_metadata_path(printer_id)
  85. if meta_path.exists():
  86. try:
  87. with open(meta_path) as f:
  88. return json.load(f)
  89. except Exception:
  90. pass
  91. return {"references": {}}
  92. def _save_metadata(self, printer_id: int, metadata: dict) -> None:
  93. """Save metadata for a printer's references."""
  94. import json
  95. meta_path = self._get_metadata_path(printer_id)
  96. with open(meta_path, "w") as f:
  97. json.dump(metadata, f, indent=2)
  98. def _get_reference_paths(self, printer_id: int) -> list[Path]:
  99. """Get all existing reference image paths for a printer."""
  100. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  101. paths = []
  102. for i in range(self.MAX_REFERENCES):
  103. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  104. if path.exists():
  105. paths.append(path)
  106. return paths
  107. def _get_next_reference_slot(self, printer_id: int) -> Path:
  108. """Get the path for the next reference image slot (cycles through slots)."""
  109. _get_calibration_dir().mkdir(parents=True, exist_ok=True)
  110. # Find first empty slot, or use oldest (slot 0) and shift others
  111. for i in range(self.MAX_REFERENCES):
  112. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  113. if not path.exists():
  114. return path
  115. # All slots full - return slot 0 (will be overwritten, but we rotate first)
  116. return _get_calibration_dir() / f"printer_{printer_id}_ref_0.jpg"
  117. def _rotate_references(self, printer_id: int) -> None:
  118. """Rotate references: delete oldest (0), shift others down."""
  119. # Delete slot 0
  120. slot0 = _get_calibration_dir() / f"printer_{printer_id}_ref_0.jpg"
  121. if slot0.exists():
  122. slot0.unlink()
  123. # Shift others down
  124. for i in range(1, self.MAX_REFERENCES):
  125. old_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  126. new_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i - 1}.jpg"
  127. if old_path.exists():
  128. old_path.rename(new_path)
  129. # Also rotate metadata
  130. metadata = self._load_metadata(printer_id)
  131. refs = metadata.get("references", {})
  132. new_refs = {}
  133. for i in range(1, self.MAX_REFERENCES):
  134. if str(i) in refs:
  135. new_refs[str(i - 1)] = refs[str(i)]
  136. metadata["references"] = new_refs
  137. self._save_metadata(printer_id, metadata)
  138. def get_references(self, printer_id: int) -> list[dict]:
  139. """Get all references with metadata for a printer.
  140. Returns list of dicts with: index, label, timestamp, has_image
  141. """
  142. metadata = self._load_metadata(printer_id)
  143. refs = metadata.get("references", {})
  144. result = []
  145. for i in range(self.MAX_REFERENCES):
  146. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  147. if path.exists():
  148. ref_meta = refs.get(str(i), {})
  149. result.append(
  150. {
  151. "index": i,
  152. "label": ref_meta.get("label", ""),
  153. "timestamp": ref_meta.get("timestamp", ""),
  154. "has_image": True,
  155. }
  156. )
  157. return result
  158. def update_reference_label(self, printer_id: int, index: int, label: str) -> bool:
  159. """Update the label for a reference."""
  160. if index < 0 or index >= self.MAX_REFERENCES:
  161. return False
  162. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  163. if not path.exists():
  164. return False
  165. metadata = self._load_metadata(printer_id)
  166. if "references" not in metadata:
  167. metadata["references"] = {}
  168. if str(index) not in metadata["references"]:
  169. metadata["references"][str(index)] = {}
  170. metadata["references"][str(index)]["label"] = label
  171. self._save_metadata(printer_id, metadata)
  172. return True
  173. def delete_reference(self, printer_id: int, index: int) -> bool:
  174. """Delete a specific reference by index."""
  175. if index < 0 or index >= self.MAX_REFERENCES:
  176. return False
  177. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  178. if not path.exists():
  179. return False
  180. # Delete image
  181. path.unlink()
  182. # Remove from metadata
  183. metadata = self._load_metadata(printer_id)
  184. refs = metadata.get("references", {})
  185. if str(index) in refs:
  186. del refs[str(index)]
  187. metadata["references"] = refs
  188. self._save_metadata(printer_id, metadata)
  189. # Shift remaining references down to fill the gap
  190. for i in range(index + 1, self.MAX_REFERENCES):
  191. old_img = _get_calibration_dir() / f"printer_{printer_id}_ref_{i}.jpg"
  192. new_img = _get_calibration_dir() / f"printer_{printer_id}_ref_{i - 1}.jpg"
  193. if old_img.exists():
  194. old_img.rename(new_img)
  195. # Also shift metadata
  196. if str(i) in refs:
  197. refs[str(i - 1)] = refs[str(i)]
  198. del refs[str(i)]
  199. metadata["references"] = refs
  200. self._save_metadata(printer_id, metadata)
  201. return True
  202. def get_reference_thumbnail(self, printer_id: int, index: int, max_size: int = 150) -> bytes | None:
  203. """Get a thumbnail of a reference image.
  204. Returns JPEG bytes or None if not found.
  205. """
  206. path = _get_calibration_dir() / f"printer_{printer_id}_ref_{index}.jpg"
  207. if not path.exists():
  208. return None
  209. try:
  210. img = cv2.imread(str(path))
  211. if img is None:
  212. return None
  213. # Calculate thumbnail size maintaining aspect ratio
  214. h, w = img.shape[:2]
  215. if w > h:
  216. new_w = max_size
  217. new_h = int(h * max_size / w)
  218. else:
  219. new_h = max_size
  220. new_w = int(w * max_size / h)
  221. thumb = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
  222. _, buffer = cv2.imencode(".jpg", thumb, [cv2.IMWRITE_JPEG_QUALITY, 80])
  223. return buffer.tobytes()
  224. except Exception as e:
  225. logger.error(f"Error creating thumbnail: {e}")
  226. return None
  227. def _extract_roi(self, frame: np.ndarray) -> tuple[np.ndarray, int, int, int, int]:
  228. """Extract the region of interest from a frame.
  229. Returns:
  230. Tuple of (roi_frame, x_start, y_start, roi_width, roi_height)
  231. """
  232. height, width = frame.shape[:2]
  233. x_start = int(width * self.roi[0])
  234. y_start = int(height * self.roi[1])
  235. roi_width = int(width * self.roi[2])
  236. roi_height = int(height * self.roi[3])
  237. roi_frame = frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  238. return roi_frame, x_start, y_start, roi_width, roi_height
  239. def _preprocess_for_comparison(self, frame: np.ndarray) -> np.ndarray:
  240. """Preprocess a frame for comparison.
  241. Uses heavy blur to create "blob" representation - smooths out texture
  242. and noise while preserving large objects. Then normalizes brightness
  243. to reduce lighting sensitivity.
  244. """
  245. gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  246. # Very heavy blur to smooth texture, keep only large shapes
  247. blurred = cv2.GaussianBlur(gray, (51, 51), 0)
  248. # Normalize to 0-255 range to reduce brightness sensitivity
  249. normalized = cv2.normalize(blurred, None, 0, 255, cv2.NORM_MINMAX)
  250. return normalized
  251. def calibrate(self, image_data: bytes, printer_id: int, label: str | None = None) -> tuple[bool, str, int]:
  252. """Calibrate by saving a reference image of the empty plate.
  253. Stores up to MAX_REFERENCES (5) images per printer. When all slots are full,
  254. the oldest reference is removed and others are shifted.
  255. Args:
  256. image_data: JPEG image data as bytes
  257. printer_id: Printer database ID
  258. label: Optional label for this reference (e.g., "High Temp Plate")
  259. Returns:
  260. Tuple of (success, message, index) where index is the slot used
  261. """
  262. from datetime import datetime
  263. try:
  264. # Decode image
  265. nparr = np.frombuffer(image_data, np.uint8)
  266. frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  267. if frame is None:
  268. return False, "Failed to decode image", -1
  269. # Get existing references count
  270. existing_refs = self._get_reference_paths(printer_id)
  271. num_existing = len(existing_refs)
  272. # If all slots are full, rotate (remove oldest)
  273. if num_existing >= self.MAX_REFERENCES:
  274. self._rotate_references(printer_id)
  275. num_existing = self.MAX_REFERENCES - 1
  276. # Save to next available slot
  277. slot_index = num_existing
  278. reference_path = _get_calibration_dir() / f"printer_{printer_id}_ref_{slot_index}.jpg"
  279. cv2.imwrite(str(reference_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
  280. # Save metadata
  281. metadata = self._load_metadata(printer_id)
  282. if "references" not in metadata:
  283. metadata["references"] = {}
  284. metadata["references"][str(slot_index)] = {
  285. "label": label or "",
  286. "timestamp": datetime.now().isoformat(),
  287. }
  288. self._save_metadata(printer_id, metadata)
  289. logger.info(
  290. f"Saved plate calibration reference {slot_index + 1}/{self.MAX_REFERENCES} for printer {printer_id}"
  291. )
  292. return True, f"Calibration saved ({slot_index + 1}/{self.MAX_REFERENCES} references)", slot_index
  293. except Exception as e:
  294. logger.exception("Error during plate calibration")
  295. return False, f"Calibration error: {e!s}", -1
  296. def get_calibration_count(self, printer_id: int) -> int:
  297. """Get the number of calibration references for a printer."""
  298. return len(self._get_reference_paths(printer_id))
  299. def has_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  300. """Check if a printer has any calibration reference images."""
  301. return len(self._get_reference_paths(printer_id)) > 0
  302. def delete_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
  303. """Delete all calibration reference images for a printer."""
  304. paths = self._get_reference_paths(printer_id)
  305. if not paths:
  306. return False
  307. for path in paths:
  308. path.unlink()
  309. logger.info(f"Deleted {len(paths)} plate calibration reference(s) for printer {printer_id}")
  310. return True
  311. def analyze_frame(
  312. self, image_data: bytes, printer_id: int, plate_type: str | None = None, include_debug_image: bool = False
  313. ) -> PlateDetectionResult:
  314. """Analyze a camera frame to detect if the plate is empty.
  315. Compares the current frame to all calibration reference images and uses
  316. the best match (lowest difference) for the final result.
  317. Args:
  318. image_data: JPEG image data as bytes
  319. printer_id: Printer database ID (for reference lookup)
  320. plate_type: Unused - kept for API compatibility
  321. include_debug_image: If True, include annotated image in result
  322. Returns:
  323. PlateDetectionResult with analysis results
  324. """
  325. try:
  326. # Check for calibration
  327. reference_paths = self._get_reference_paths(printer_id)
  328. if not reference_paths:
  329. return PlateDetectionResult(
  330. is_empty=True, # Default to empty when not calibrated
  331. confidence=0.0,
  332. difference_percent=0.0,
  333. message="No calibration - please calibrate with empty plate first",
  334. needs_calibration=True,
  335. )
  336. # Decode current image
  337. nparr = np.frombuffer(image_data, np.uint8)
  338. current_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
  339. if current_frame is None:
  340. return PlateDetectionResult(
  341. is_empty=True,
  342. confidence=0.0,
  343. difference_percent=0.0,
  344. message="Failed to decode current image",
  345. )
  346. # Extract ROI from current frame
  347. current_roi, x_start, y_start, roi_width, roi_height = self._extract_roi(current_frame)
  348. current_processed = self._preprocess_for_comparison(current_roi)
  349. # Compare against all references, find best match (lowest difference)
  350. best_difference_percent = float("inf")
  351. best_ref_idx = -1
  352. best_diff = None
  353. for idx, ref_path in enumerate(reference_paths):
  354. # Load reference image
  355. reference_frame = cv2.imread(str(ref_path), cv2.IMREAD_COLOR)
  356. if reference_frame is None:
  357. continue
  358. # Ensure same dimensions
  359. if current_frame.shape != reference_frame.shape:
  360. reference_frame = cv2.resize(reference_frame, (current_frame.shape[1], current_frame.shape[0]))
  361. # Extract ROI and preprocess
  362. reference_roi, _, _, _, _ = self._extract_roi(reference_frame)
  363. reference_processed = self._preprocess_for_comparison(reference_roi)
  364. # Calculate absolute difference
  365. diff = cv2.absdiff(current_processed, reference_processed)
  366. # Calculate mean difference as percentage
  367. mean_diff = np.mean(diff)
  368. difference_percent = (mean_diff / 255.0) * 100
  369. if difference_percent < best_difference_percent:
  370. best_difference_percent = difference_percent
  371. best_ref_idx = idx
  372. best_diff = diff
  373. if best_ref_idx == -1:
  374. return PlateDetectionResult(
  375. is_empty=True,
  376. confidence=0.0,
  377. difference_percent=0.0,
  378. message="Failed to load any reference images - please recalibrate",
  379. needs_calibration=True,
  380. )
  381. difference_percent = best_difference_percent
  382. # Determine if plate is empty (use best match)
  383. is_empty = difference_percent < self.difference_threshold
  384. # Calculate confidence
  385. if is_empty:
  386. # Higher confidence when very little difference
  387. confidence = 1.0 - min(1.0, difference_percent / self.difference_threshold)
  388. else:
  389. # Higher confidence when clearly different
  390. confidence = min(1.0, difference_percent / (self.difference_threshold * 2))
  391. # Generate message
  392. num_refs = len(reference_paths)
  393. if is_empty:
  394. message = (
  395. f"Plate appears empty (difference: {difference_percent:.1f}%, ref {best_ref_idx + 1}/{num_refs})"
  396. )
  397. else:
  398. message = f"Objects detected on plate (difference: {difference_percent:.1f}%, best ref {best_ref_idx + 1}/{num_refs})"
  399. # Generate debug image if requested
  400. debug_image = None
  401. if include_debug_image and best_diff is not None:
  402. debug_frame = current_frame.copy()
  403. # Draw ROI rectangle
  404. cv2.rectangle(
  405. debug_frame,
  406. (x_start, y_start),
  407. (x_start + roi_width, y_start + roi_height),
  408. (0, 255, 0),
  409. 2,
  410. )
  411. # Create colored difference overlay
  412. # Red = areas that are different from reference
  413. # Amplify diff for visibility (multiply by 3, cap at 255)
  414. diff_amplified = np.minimum(best_diff * 3, 255).astype(np.uint8)
  415. diff_colored = cv2.cvtColor(diff_amplified, cv2.COLOR_GRAY2BGR)
  416. diff_colored[:, :, 0] = 0 # Remove blue
  417. diff_colored[:, :, 1] = 0 # Remove green
  418. # Red channel has the diff
  419. # Overlay difference on ROI
  420. roi_overlay = debug_frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
  421. cv2.addWeighted(diff_colored, 0.5, roi_overlay, 0.5, 0, roi_overlay)
  422. # Add status text
  423. status_text = "EMPTY" if is_empty else "OBJECTS DETECTED"
  424. color = (0, 255, 0) if is_empty else (0, 0, 255)
  425. cv2.putText(debug_frame, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
  426. cv2.putText(
  427. debug_frame,
  428. f"Diff: {difference_percent:.1f}% (ref {best_ref_idx + 1}/{num_refs})",
  429. (10, 60),
  430. cv2.FONT_HERSHEY_SIMPLEX,
  431. 0.7,
  432. color,
  433. 2,
  434. )
  435. cv2.putText(
  436. debug_frame,
  437. f"Confidence: {confidence:.0%}",
  438. (10, 90),
  439. cv2.FONT_HERSHEY_SIMPLEX,
  440. 0.7,
  441. color,
  442. 2,
  443. )
  444. # Encode debug image as JPEG
  445. _, buffer = cv2.imencode(".jpg", debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
  446. debug_image = buffer.tobytes()
  447. return PlateDetectionResult(
  448. is_empty=is_empty,
  449. confidence=confidence,
  450. difference_percent=difference_percent,
  451. message=message,
  452. debug_image=debug_image,
  453. )
  454. except Exception as e:
  455. logger.exception("Error analyzing frame for plate detection")
  456. return PlateDetectionResult(
  457. is_empty=True, # Default to empty on error (don't block prints)
  458. confidence=0.0,
  459. difference_percent=0.0,
  460. message=f"Analysis error: {e!s}",
  461. )
  462. async def capture_camera_image(
  463. printer_id: int,
  464. ip_address: str,
  465. access_code: str,
  466. model: str,
  467. external_camera_url: str | None = None,
  468. external_camera_type: str | None = None,
  469. use_external: bool = False,
  470. ) -> tuple[bytes | None, str]:
  471. """Capture an image from the printer camera.
  472. If there's an active camera stream, uses the buffered frame instead of
  473. creating a new connection (which would fail while stream is active).
  474. Returns:
  475. Tuple of (image_data, camera_source) or (None, error_message)
  476. """
  477. image_data: bytes | None = None
  478. camera_source = "unknown"
  479. # Try external camera first if requested and available
  480. if use_external and external_camera_url and external_camera_type:
  481. try:
  482. from backend.app.services.external_camera import capture_frame
  483. image_data = await capture_frame(external_camera_url, external_camera_type)
  484. if image_data:
  485. camera_source = "external"
  486. logger.debug(f"Captured frame from external camera for printer {printer_id}")
  487. except Exception as e:
  488. logger.warning(f"Failed to capture from external camera: {e}")
  489. # Fall back to built-in camera
  490. if image_data is None:
  491. # First, check if there's an active stream with a buffered frame
  492. # This avoids blocking when camera viewer is open
  493. try:
  494. from backend.app.api.routes.camera import get_buffered_frame
  495. buffered = get_buffered_frame(printer_id)
  496. if buffered:
  497. image_data = buffered
  498. camera_source = "built-in (buffered)"
  499. logger.debug(f"Using buffered frame from active stream for printer {printer_id}")
  500. except Exception as e:
  501. logger.debug(f"Could not get buffered frame: {e}")
  502. # If no buffered frame, try to capture a new one
  503. if image_data is None:
  504. import tempfile
  505. from backend.app.services.camera import capture_camera_frame
  506. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
  507. tmp_path = Path(tmp.name)
  508. try:
  509. success = await capture_camera_frame(ip_address, access_code, model, tmp_path, timeout=10)
  510. if success:
  511. with open(tmp_path, "rb") as f:
  512. image_data = f.read()
  513. camera_source = "built-in"
  514. logger.debug(f"Captured frame from built-in camera for printer {printer_id}")
  515. finally:
  516. try:
  517. tmp_path.unlink()
  518. except Exception:
  519. pass
  520. return image_data, camera_source
  521. async def check_plate_empty(
  522. printer_id: int,
  523. ip_address: str,
  524. access_code: str,
  525. model: str,
  526. plate_type: str | None = None,
  527. include_debug_image: bool = False,
  528. external_camera_url: str | None = None,
  529. external_camera_type: str | None = None,
  530. use_external: bool = False,
  531. roi: tuple[float, float, float, float] | None = None,
  532. ) -> PlateDetectionResult:
  533. """Check if the build plate is empty for a printer.
  534. Args:
  535. printer_id: Printer database ID
  536. ip_address: Printer IP address
  537. access_code: Printer access code
  538. model: Printer model string
  539. plate_type: Type of build plate for calibration lookup
  540. include_debug_image: If True, include annotated image in result
  541. external_camera_url: URL of external camera (if configured)
  542. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  543. use_external: If True, prefer external camera over built-in
  544. roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
  545. Returns:
  546. PlateDetectionResult with analysis results
  547. """
  548. if not OPENCV_AVAILABLE:
  549. return PlateDetectionResult(
  550. is_empty=True,
  551. confidence=0.0,
  552. difference_percent=0.0,
  553. message="OpenCV not available - plate detection disabled",
  554. )
  555. image_data, camera_source = await capture_camera_image(
  556. printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
  557. )
  558. if image_data is None:
  559. return PlateDetectionResult(
  560. is_empty=True, # Default to empty on error
  561. confidence=0.0,
  562. difference_percent=0.0,
  563. message="Failed to capture camera frame from any source",
  564. )
  565. # Analyze the captured frame
  566. detector = PlateDetector(roi=roi)
  567. result = detector.analyze_frame(image_data, printer_id, plate_type, include_debug_image)
  568. # Add camera source to message
  569. result.message = f"[{camera_source}] {result.message}"
  570. return result
  571. async def calibrate_plate(
  572. printer_id: int,
  573. ip_address: str,
  574. access_code: str,
  575. model: str,
  576. label: str | None = None,
  577. external_camera_url: str | None = None,
  578. external_camera_type: str | None = None,
  579. use_external: bool = False,
  580. ) -> tuple[bool, str, int]:
  581. """Calibrate plate detection by capturing a reference image of the empty plate.
  582. Args:
  583. printer_id: Printer database ID
  584. ip_address: Printer IP address
  585. access_code: Printer access code
  586. model: Printer model string
  587. label: Optional label for this reference (e.g., "High Temp Plate")
  588. external_camera_url: URL of external camera (if configured)
  589. external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
  590. use_external: If True, prefer external camera over built-in
  591. Returns:
  592. Tuple of (success, message, index)
  593. """
  594. if not OPENCV_AVAILABLE:
  595. return False, "OpenCV not available - plate detection disabled", -1
  596. image_data, camera_source = await capture_camera_image(
  597. printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
  598. )
  599. if image_data is None:
  600. return False, "Failed to capture camera frame for calibration", -1
  601. detector = PlateDetector()
  602. success, message, index = detector.calibrate(image_data, printer_id, label)
  603. if success:
  604. message = f"[{camera_source}] {message}"
  605. return success, message, index
  606. def get_calibration_status(printer_id: int, plate_type: str | None = None) -> dict:
  607. """Get calibration status for a printer.
  608. Returns:
  609. Dict with calibration info including reference count
  610. """
  611. if not OPENCV_AVAILABLE:
  612. return {
  613. "available": False,
  614. "calibrated": False,
  615. "reference_count": 0,
  616. "max_references": 5,
  617. "message": "OpenCV not available",
  618. }
  619. detector = PlateDetector()
  620. calibrated = detector.has_calibration(printer_id)
  621. ref_count = detector.get_calibration_count(printer_id)
  622. if calibrated:
  623. message = f"Calibrated with {ref_count}/{detector.MAX_REFERENCES} reference(s)"
  624. else:
  625. message = "Not calibrated - please calibrate with empty plate"
  626. return {
  627. "available": True,
  628. "calibrated": calibrated,
  629. "reference_count": ref_count,
  630. "max_references": detector.MAX_REFERENCES,
  631. "message": message,
  632. }
  633. def delete_calibration(printer_id: int, plate_type: str | None = None) -> bool:
  634. """Delete calibration for a printer and plate type."""
  635. if not OPENCV_AVAILABLE:
  636. return False
  637. detector = PlateDetector()
  638. return detector.delete_calibration(printer_id, plate_type)
  639. def is_plate_detection_available() -> bool:
  640. """Check if plate detection feature is available (OpenCV installed)."""
  641. return OPENCV_AVAILABLE