|
|
@@ -0,0 +1,774 @@
|
|
|
+"""Build plate empty detection using OpenCV.
|
|
|
+
|
|
|
+Analyzes camera frames to detect if there are objects on the build plate.
|
|
|
+Uses calibration-based difference detection - compares current frame to
|
|
|
+a reference image of the empty plate.
|
|
|
+"""
|
|
|
+
|
|
|
+import logging
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+# Optional OpenCV import - feature disabled if not available
|
|
|
+try:
|
|
|
+ import cv2
|
|
|
+ import numpy as np
|
|
|
+
|
|
|
+ OPENCV_AVAILABLE = True
|
|
|
+except ImportError:
|
|
|
+ OPENCV_AVAILABLE = False
|
|
|
+ logger.info("OpenCV not available - plate detection feature disabled")
|
|
|
+
|
|
|
+# Directory to store calibration reference images
|
|
|
+CALIBRATION_DIR = Path(__file__).parent.parent.parent.parent / "data" / "plate_calibration"
|
|
|
+
|
|
|
+
|
|
|
+class PlateDetectionResult:
|
|
|
+ """Result of plate detection analysis."""
|
|
|
+
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ is_empty: bool,
|
|
|
+ confidence: float,
|
|
|
+ difference_percent: float,
|
|
|
+ message: str,
|
|
|
+ debug_image: bytes | None = None,
|
|
|
+ needs_calibration: bool = False,
|
|
|
+ ):
|
|
|
+ self.is_empty = is_empty
|
|
|
+ self.confidence = confidence # 0.0 to 1.0
|
|
|
+ self.difference_percent = difference_percent # How different from reference
|
|
|
+ self.message = message
|
|
|
+ self.debug_image = debug_image # Optional annotated image for debugging
|
|
|
+ self.needs_calibration = needs_calibration # True if no reference image exists
|
|
|
+
|
|
|
+ def to_dict(self) -> dict:
|
|
|
+ return {
|
|
|
+ "is_empty": bool(self.is_empty),
|
|
|
+ "confidence": float(round(self.confidence, 2)),
|
|
|
+ "difference_percent": float(round(self.difference_percent, 2)),
|
|
|
+ "message": self.message,
|
|
|
+ "has_debug_image": self.debug_image is not None,
|
|
|
+ "needs_calibration": bool(self.needs_calibration),
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+class PlateDetector:
|
|
|
+ """Detects if the build plate is empty using calibration-based difference detection."""
|
|
|
+
|
|
|
+ # Default region of interest (ROI) as percentage of image dimensions
|
|
|
+ # These define where the build plate typically appears in the camera view
|
|
|
+ # Format: (x_start%, y_start%, width%, height%)
|
|
|
+ DEFAULT_ROI = (0.15, 0.35, 0.70, 0.55) # Center-lower portion of frame
|
|
|
+
|
|
|
+ # Detection thresholds for difference detection
|
|
|
+ # Using mean pixel difference (0-100% scale)
|
|
|
+ # Small objects may only cause 1-2% mean difference
|
|
|
+ DEFAULT_DIFFERENCE_THRESHOLD = 1.0
|
|
|
+ DEFAULT_BLUR_SIZE = 21 # Gaussian blur kernel size (must be odd) - unused with edge detection
|
|
|
+
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ roi: tuple[float, float, float, float] | None = None,
|
|
|
+ difference_threshold: float = DEFAULT_DIFFERENCE_THRESHOLD,
|
|
|
+ blur_size: int = DEFAULT_BLUR_SIZE,
|
|
|
+ ):
|
|
|
+ """Initialize the plate detector.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
|
|
|
+ difference_threshold: Percentage of pixels that must differ to trigger "not empty"
|
|
|
+ blur_size: Gaussian blur kernel size for noise reduction
|
|
|
+ """
|
|
|
+ if not OPENCV_AVAILABLE:
|
|
|
+ raise RuntimeError("OpenCV is not installed. Install with: pip install opencv-python-headless")
|
|
|
+
|
|
|
+ self.roi = roi or self.DEFAULT_ROI
|
|
|
+ self.difference_threshold = difference_threshold
|
|
|
+ self.blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1 # Must be odd
|
|
|
+
|
|
|
+ # Maximum number of reference images to store per printer
|
|
|
+ MAX_REFERENCES = 5
|
|
|
+
|
|
|
+ def _get_metadata_path(self, printer_id: int) -> Path:
|
|
|
+ """Get the path to the metadata JSON file for a printer."""
|
|
|
+ CALIBRATION_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
+ return CALIBRATION_DIR / f"printer_{printer_id}_metadata.json"
|
|
|
+
|
|
|
+ def _load_metadata(self, printer_id: int) -> dict:
|
|
|
+ """Load metadata for a printer's references."""
|
|
|
+ import json
|
|
|
+
|
|
|
+ meta_path = self._get_metadata_path(printer_id)
|
|
|
+ if meta_path.exists():
|
|
|
+ try:
|
|
|
+ with open(meta_path) as f:
|
|
|
+ return json.load(f)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ return {"references": {}}
|
|
|
+
|
|
|
+ def _save_metadata(self, printer_id: int, metadata: dict) -> None:
|
|
|
+ """Save metadata for a printer's references."""
|
|
|
+ import json
|
|
|
+
|
|
|
+ meta_path = self._get_metadata_path(printer_id)
|
|
|
+ with open(meta_path, "w") as f:
|
|
|
+ json.dump(metadata, f, indent=2)
|
|
|
+
|
|
|
+ def _get_reference_paths(self, printer_id: int) -> list[Path]:
|
|
|
+ """Get all existing reference image paths for a printer."""
|
|
|
+ CALIBRATION_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
+ paths = []
|
|
|
+ for i in range(self.MAX_REFERENCES):
|
|
|
+ path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
|
|
|
+ if path.exists():
|
|
|
+ paths.append(path)
|
|
|
+ return paths
|
|
|
+
|
|
|
+ def _get_next_reference_slot(self, printer_id: int) -> Path:
|
|
|
+ """Get the path for the next reference image slot (cycles through slots)."""
|
|
|
+ CALIBRATION_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
+ # Find first empty slot, or use oldest (slot 0) and shift others
|
|
|
+ for i in range(self.MAX_REFERENCES):
|
|
|
+ path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
|
|
|
+ if not path.exists():
|
|
|
+ return path
|
|
|
+ # All slots full - return slot 0 (will be overwritten, but we rotate first)
|
|
|
+ return CALIBRATION_DIR / f"printer_{printer_id}_ref_0.jpg"
|
|
|
+
|
|
|
+ def _rotate_references(self, printer_id: int) -> None:
|
|
|
+ """Rotate references: delete oldest (0), shift others down."""
|
|
|
+ # Delete slot 0
|
|
|
+ slot0 = CALIBRATION_DIR / f"printer_{printer_id}_ref_0.jpg"
|
|
|
+ if slot0.exists():
|
|
|
+ slot0.unlink()
|
|
|
+ # Shift others down
|
|
|
+ for i in range(1, self.MAX_REFERENCES):
|
|
|
+ old_path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
|
|
|
+ new_path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i - 1}.jpg"
|
|
|
+ if old_path.exists():
|
|
|
+ old_path.rename(new_path)
|
|
|
+
|
|
|
+ # Also rotate metadata
|
|
|
+ metadata = self._load_metadata(printer_id)
|
|
|
+ refs = metadata.get("references", {})
|
|
|
+ new_refs = {}
|
|
|
+ for i in range(1, self.MAX_REFERENCES):
|
|
|
+ if str(i) in refs:
|
|
|
+ new_refs[str(i - 1)] = refs[str(i)]
|
|
|
+ metadata["references"] = new_refs
|
|
|
+ self._save_metadata(printer_id, metadata)
|
|
|
+
|
|
|
+ def get_references(self, printer_id: int) -> list[dict]:
|
|
|
+ """Get all references with metadata for a printer.
|
|
|
+
|
|
|
+ Returns list of dicts with: index, label, timestamp, has_image
|
|
|
+ """
|
|
|
+
|
|
|
+ metadata = self._load_metadata(printer_id)
|
|
|
+ refs = metadata.get("references", {})
|
|
|
+ result = []
|
|
|
+
|
|
|
+ for i in range(self.MAX_REFERENCES):
|
|
|
+ path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
|
|
|
+ if path.exists():
|
|
|
+ ref_meta = refs.get(str(i), {})
|
|
|
+ result.append(
|
|
|
+ {
|
|
|
+ "index": i,
|
|
|
+ "label": ref_meta.get("label", ""),
|
|
|
+ "timestamp": ref_meta.get("timestamp", ""),
|
|
|
+ "has_image": True,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ def update_reference_label(self, printer_id: int, index: int, label: str) -> bool:
|
|
|
+ """Update the label for a reference."""
|
|
|
+ if index < 0 or index >= self.MAX_REFERENCES:
|
|
|
+ return False
|
|
|
+
|
|
|
+ path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{index}.jpg"
|
|
|
+ if not path.exists():
|
|
|
+ return False
|
|
|
+
|
|
|
+ metadata = self._load_metadata(printer_id)
|
|
|
+ if "references" not in metadata:
|
|
|
+ metadata["references"] = {}
|
|
|
+ if str(index) not in metadata["references"]:
|
|
|
+ metadata["references"][str(index)] = {}
|
|
|
+
|
|
|
+ metadata["references"][str(index)]["label"] = label
|
|
|
+ self._save_metadata(printer_id, metadata)
|
|
|
+ return True
|
|
|
+
|
|
|
+ def delete_reference(self, printer_id: int, index: int) -> bool:
|
|
|
+ """Delete a specific reference by index."""
|
|
|
+ if index < 0 or index >= self.MAX_REFERENCES:
|
|
|
+ return False
|
|
|
+
|
|
|
+ path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{index}.jpg"
|
|
|
+ if not path.exists():
|
|
|
+ return False
|
|
|
+
|
|
|
+ # Delete image
|
|
|
+ path.unlink()
|
|
|
+
|
|
|
+ # Remove from metadata
|
|
|
+ metadata = self._load_metadata(printer_id)
|
|
|
+ refs = metadata.get("references", {})
|
|
|
+ if str(index) in refs:
|
|
|
+ del refs[str(index)]
|
|
|
+ metadata["references"] = refs
|
|
|
+ self._save_metadata(printer_id, metadata)
|
|
|
+
|
|
|
+ # Shift remaining references down to fill the gap
|
|
|
+ for i in range(index + 1, self.MAX_REFERENCES):
|
|
|
+ old_img = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i}.jpg"
|
|
|
+ new_img = CALIBRATION_DIR / f"printer_{printer_id}_ref_{i - 1}.jpg"
|
|
|
+ if old_img.exists():
|
|
|
+ old_img.rename(new_img)
|
|
|
+ # Also shift metadata
|
|
|
+ if str(i) in refs:
|
|
|
+ refs[str(i - 1)] = refs[str(i)]
|
|
|
+ del refs[str(i)]
|
|
|
+
|
|
|
+ metadata["references"] = refs
|
|
|
+ self._save_metadata(printer_id, metadata)
|
|
|
+ return True
|
|
|
+
|
|
|
+ def get_reference_thumbnail(self, printer_id: int, index: int, max_size: int = 150) -> bytes | None:
|
|
|
+ """Get a thumbnail of a reference image.
|
|
|
+
|
|
|
+ Returns JPEG bytes or None if not found.
|
|
|
+ """
|
|
|
+ path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{index}.jpg"
|
|
|
+ if not path.exists():
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ img = cv2.imread(str(path))
|
|
|
+ if img is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # Calculate thumbnail size maintaining aspect ratio
|
|
|
+ h, w = img.shape[:2]
|
|
|
+ if w > h:
|
|
|
+ new_w = max_size
|
|
|
+ new_h = int(h * max_size / w)
|
|
|
+ else:
|
|
|
+ new_h = max_size
|
|
|
+ new_w = int(w * max_size / h)
|
|
|
+
|
|
|
+ thumb = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
|
|
+ _, buffer = cv2.imencode(".jpg", thumb, [cv2.IMWRITE_JPEG_QUALITY, 80])
|
|
|
+ return buffer.tobytes()
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error creating thumbnail: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _extract_roi(self, frame: np.ndarray) -> tuple[np.ndarray, int, int, int, int]:
|
|
|
+ """Extract the region of interest from a frame.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple of (roi_frame, x_start, y_start, roi_width, roi_height)
|
|
|
+ """
|
|
|
+ height, width = frame.shape[:2]
|
|
|
+ x_start = int(width * self.roi[0])
|
|
|
+ y_start = int(height * self.roi[1])
|
|
|
+ roi_width = int(width * self.roi[2])
|
|
|
+ roi_height = int(height * self.roi[3])
|
|
|
+ roi_frame = frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
|
|
|
+ return roi_frame, x_start, y_start, roi_width, roi_height
|
|
|
+
|
|
|
+ def _preprocess_for_comparison(self, frame: np.ndarray) -> np.ndarray:
|
|
|
+ """Preprocess a frame for comparison.
|
|
|
+
|
|
|
+ Uses heavy blur to create "blob" representation - smooths out texture
|
|
|
+ and noise while preserving large objects. Then normalizes brightness
|
|
|
+ to reduce lighting sensitivity.
|
|
|
+ """
|
|
|
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
+ # Very heavy blur to smooth texture, keep only large shapes
|
|
|
+ blurred = cv2.GaussianBlur(gray, (51, 51), 0)
|
|
|
+ # Normalize to 0-255 range to reduce brightness sensitivity
|
|
|
+ normalized = cv2.normalize(blurred, None, 0, 255, cv2.NORM_MINMAX)
|
|
|
+ return normalized
|
|
|
+
|
|
|
+ def calibrate(self, image_data: bytes, printer_id: int, label: str | None = None) -> tuple[bool, str, int]:
|
|
|
+ """Calibrate by saving a reference image of the empty plate.
|
|
|
+
|
|
|
+ Stores up to MAX_REFERENCES (5) images per printer. When all slots are full,
|
|
|
+ the oldest reference is removed and others are shifted.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ image_data: JPEG image data as bytes
|
|
|
+ printer_id: Printer database ID
|
|
|
+ label: Optional label for this reference (e.g., "High Temp Plate")
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple of (success, message, index) where index is the slot used
|
|
|
+ """
|
|
|
+ from datetime import datetime
|
|
|
+
|
|
|
+ try:
|
|
|
+ # Decode image
|
|
|
+ nparr = np.frombuffer(image_data, np.uint8)
|
|
|
+ frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
|
+
|
|
|
+ if frame is None:
|
|
|
+ return False, "Failed to decode image", -1
|
|
|
+
|
|
|
+ # Get existing references count
|
|
|
+ existing_refs = self._get_reference_paths(printer_id)
|
|
|
+ num_existing = len(existing_refs)
|
|
|
+
|
|
|
+ # If all slots are full, rotate (remove oldest)
|
|
|
+ if num_existing >= self.MAX_REFERENCES:
|
|
|
+ self._rotate_references(printer_id)
|
|
|
+ num_existing = self.MAX_REFERENCES - 1
|
|
|
+
|
|
|
+ # Save to next available slot
|
|
|
+ slot_index = num_existing
|
|
|
+ reference_path = CALIBRATION_DIR / f"printer_{printer_id}_ref_{slot_index}.jpg"
|
|
|
+ cv2.imwrite(str(reference_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
|
|
|
+
|
|
|
+ # Save metadata
|
|
|
+ metadata = self._load_metadata(printer_id)
|
|
|
+ if "references" not in metadata:
|
|
|
+ metadata["references"] = {}
|
|
|
+ metadata["references"][str(slot_index)] = {
|
|
|
+ "label": label or "",
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ }
|
|
|
+ self._save_metadata(printer_id, metadata)
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"Saved plate calibration reference {slot_index + 1}/{self.MAX_REFERENCES} for printer {printer_id}"
|
|
|
+ )
|
|
|
+ return True, f"Calibration saved ({slot_index + 1}/{self.MAX_REFERENCES} references)", slot_index
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.exception("Error during plate calibration")
|
|
|
+ return False, f"Calibration error: {e!s}", -1
|
|
|
+
|
|
|
+ def get_calibration_count(self, printer_id: int) -> int:
|
|
|
+ """Get the number of calibration references for a printer."""
|
|
|
+ return len(self._get_reference_paths(printer_id))
|
|
|
+
|
|
|
+ def has_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
|
|
|
+ """Check if a printer has any calibration reference images."""
|
|
|
+ return len(self._get_reference_paths(printer_id)) > 0
|
|
|
+
|
|
|
+ def delete_calibration(self, printer_id: int, plate_type: str | None = None) -> bool:
|
|
|
+ """Delete all calibration reference images for a printer."""
|
|
|
+ paths = self._get_reference_paths(printer_id)
|
|
|
+ if not paths:
|
|
|
+ return False
|
|
|
+ for path in paths:
|
|
|
+ path.unlink()
|
|
|
+ logger.info(f"Deleted {len(paths)} plate calibration reference(s) for printer {printer_id}")
|
|
|
+ return True
|
|
|
+
|
|
|
+ def analyze_frame(
|
|
|
+ self, image_data: bytes, printer_id: int, plate_type: str | None = None, include_debug_image: bool = False
|
|
|
+ ) -> PlateDetectionResult:
|
|
|
+ """Analyze a camera frame to detect if the plate is empty.
|
|
|
+
|
|
|
+ Compares the current frame to all calibration reference images and uses
|
|
|
+ the best match (lowest difference) for the final result.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ image_data: JPEG image data as bytes
|
|
|
+ printer_id: Printer database ID (for reference lookup)
|
|
|
+ plate_type: Unused - kept for API compatibility
|
|
|
+ include_debug_image: If True, include annotated image in result
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ PlateDetectionResult with analysis results
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # Check for calibration
|
|
|
+ reference_paths = self._get_reference_paths(printer_id)
|
|
|
+ if not reference_paths:
|
|
|
+ return PlateDetectionResult(
|
|
|
+ is_empty=True, # Default to empty when not calibrated
|
|
|
+ confidence=0.0,
|
|
|
+ difference_percent=0.0,
|
|
|
+ message="No calibration - please calibrate with empty plate first",
|
|
|
+ needs_calibration=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Decode current image
|
|
|
+ nparr = np.frombuffer(image_data, np.uint8)
|
|
|
+ current_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
|
+
|
|
|
+ if current_frame is None:
|
|
|
+ return PlateDetectionResult(
|
|
|
+ is_empty=True,
|
|
|
+ confidence=0.0,
|
|
|
+ difference_percent=0.0,
|
|
|
+ message="Failed to decode current image",
|
|
|
+ )
|
|
|
+
|
|
|
+ # Extract ROI from current frame
|
|
|
+ current_roi, x_start, y_start, roi_width, roi_height = self._extract_roi(current_frame)
|
|
|
+ current_processed = self._preprocess_for_comparison(current_roi)
|
|
|
+
|
|
|
+ # Compare against all references, find best match (lowest difference)
|
|
|
+ best_difference_percent = float("inf")
|
|
|
+ best_ref_idx = -1
|
|
|
+ best_diff = None
|
|
|
+
|
|
|
+ for idx, ref_path in enumerate(reference_paths):
|
|
|
+ # Load reference image
|
|
|
+ reference_frame = cv2.imread(str(ref_path), cv2.IMREAD_COLOR)
|
|
|
+ if reference_frame is None:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Ensure same dimensions
|
|
|
+ if current_frame.shape != reference_frame.shape:
|
|
|
+ reference_frame = cv2.resize(reference_frame, (current_frame.shape[1], current_frame.shape[0]))
|
|
|
+
|
|
|
+ # Extract ROI and preprocess
|
|
|
+ reference_roi, _, _, _, _ = self._extract_roi(reference_frame)
|
|
|
+ reference_processed = self._preprocess_for_comparison(reference_roi)
|
|
|
+
|
|
|
+ # Calculate absolute difference
|
|
|
+ diff = cv2.absdiff(current_processed, reference_processed)
|
|
|
+
|
|
|
+ # Calculate mean difference as percentage
|
|
|
+ mean_diff = np.mean(diff)
|
|
|
+ difference_percent = (mean_diff / 255.0) * 100
|
|
|
+
|
|
|
+ if difference_percent < best_difference_percent:
|
|
|
+ best_difference_percent = difference_percent
|
|
|
+ best_ref_idx = idx
|
|
|
+ best_diff = diff
|
|
|
+
|
|
|
+ if best_ref_idx == -1:
|
|
|
+ return PlateDetectionResult(
|
|
|
+ is_empty=True,
|
|
|
+ confidence=0.0,
|
|
|
+ difference_percent=0.0,
|
|
|
+ message="Failed to load any reference images - please recalibrate",
|
|
|
+ needs_calibration=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ difference_percent = best_difference_percent
|
|
|
+
|
|
|
+ # Determine if plate is empty (use best match)
|
|
|
+ is_empty = difference_percent < self.difference_threshold
|
|
|
+
|
|
|
+ # Calculate confidence
|
|
|
+ if is_empty:
|
|
|
+ # Higher confidence when very little difference
|
|
|
+ confidence = 1.0 - min(1.0, difference_percent / self.difference_threshold)
|
|
|
+ else:
|
|
|
+ # Higher confidence when clearly different
|
|
|
+ confidence = min(1.0, difference_percent / (self.difference_threshold * 2))
|
|
|
+
|
|
|
+ # Generate message
|
|
|
+ num_refs = len(reference_paths)
|
|
|
+ if is_empty:
|
|
|
+ message = (
|
|
|
+ f"Plate appears empty (difference: {difference_percent:.1f}%, ref {best_ref_idx + 1}/{num_refs})"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ message = f"Objects detected on plate (difference: {difference_percent:.1f}%, best ref {best_ref_idx + 1}/{num_refs})"
|
|
|
+
|
|
|
+ # Generate debug image if requested
|
|
|
+ debug_image = None
|
|
|
+ if include_debug_image and best_diff is not None:
|
|
|
+ debug_frame = current_frame.copy()
|
|
|
+
|
|
|
+ # Draw ROI rectangle
|
|
|
+ cv2.rectangle(
|
|
|
+ debug_frame,
|
|
|
+ (x_start, y_start),
|
|
|
+ (x_start + roi_width, y_start + roi_height),
|
|
|
+ (0, 255, 0),
|
|
|
+ 2,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Create colored difference overlay
|
|
|
+ # Red = areas that are different from reference
|
|
|
+ # Amplify diff for visibility (multiply by 3, cap at 255)
|
|
|
+ diff_amplified = np.minimum(best_diff * 3, 255).astype(np.uint8)
|
|
|
+ diff_colored = cv2.cvtColor(diff_amplified, cv2.COLOR_GRAY2BGR)
|
|
|
+ diff_colored[:, :, 0] = 0 # Remove blue
|
|
|
+ diff_colored[:, :, 1] = 0 # Remove green
|
|
|
+ # Red channel has the diff
|
|
|
+
|
|
|
+ # Overlay difference on ROI
|
|
|
+ roi_overlay = debug_frame[y_start : y_start + roi_height, x_start : x_start + roi_width]
|
|
|
+ cv2.addWeighted(diff_colored, 0.5, roi_overlay, 0.5, 0, roi_overlay)
|
|
|
+
|
|
|
+ # Add status text
|
|
|
+ status_text = "EMPTY" if is_empty else "OBJECTS DETECTED"
|
|
|
+ color = (0, 255, 0) if is_empty else (0, 0, 255)
|
|
|
+ cv2.putText(debug_frame, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
|
|
|
+ cv2.putText(
|
|
|
+ debug_frame,
|
|
|
+ f"Diff: {difference_percent:.1f}% (ref {best_ref_idx + 1}/{num_refs})",
|
|
|
+ (10, 60),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX,
|
|
|
+ 0.7,
|
|
|
+ color,
|
|
|
+ 2,
|
|
|
+ )
|
|
|
+ cv2.putText(
|
|
|
+ debug_frame,
|
|
|
+ f"Confidence: {confidence:.0%}",
|
|
|
+ (10, 90),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX,
|
|
|
+ 0.7,
|
|
|
+ color,
|
|
|
+ 2,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Encode debug image as JPEG
|
|
|
+ _, buffer = cv2.imencode(".jpg", debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
|
|
+ debug_image = buffer.tobytes()
|
|
|
+
|
|
|
+ return PlateDetectionResult(
|
|
|
+ is_empty=is_empty,
|
|
|
+ confidence=confidence,
|
|
|
+ difference_percent=difference_percent,
|
|
|
+ message=message,
|
|
|
+ debug_image=debug_image,
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.exception("Error analyzing frame for plate detection")
|
|
|
+ return PlateDetectionResult(
|
|
|
+ is_empty=True, # Default to empty on error (don't block prints)
|
|
|
+ confidence=0.0,
|
|
|
+ difference_percent=0.0,
|
|
|
+ message=f"Analysis error: {e!s}",
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+async def capture_camera_image(
|
|
|
+ printer_id: int,
|
|
|
+ ip_address: str,
|
|
|
+ access_code: str,
|
|
|
+ model: str,
|
|
|
+ external_camera_url: str | None = None,
|
|
|
+ external_camera_type: str | None = None,
|
|
|
+ use_external: bool = False,
|
|
|
+) -> tuple[bytes | None, str]:
|
|
|
+ """Capture an image from the printer camera.
|
|
|
+
|
|
|
+ If there's an active camera stream, uses the buffered frame instead of
|
|
|
+ creating a new connection (which would fail while stream is active).
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple of (image_data, camera_source) or (None, error_message)
|
|
|
+ """
|
|
|
+ image_data: bytes | None = None
|
|
|
+ camera_source = "unknown"
|
|
|
+
|
|
|
+ # Try external camera first if requested and available
|
|
|
+ if use_external and external_camera_url and external_camera_type:
|
|
|
+ try:
|
|
|
+ from backend.app.services.external_camera import capture_frame
|
|
|
+
|
|
|
+ image_data = await capture_frame(external_camera_url, external_camera_type)
|
|
|
+ if image_data:
|
|
|
+ camera_source = "external"
|
|
|
+ logger.debug(f"Captured frame from external camera for printer {printer_id}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Failed to capture from external camera: {e}")
|
|
|
+
|
|
|
+ # Fall back to built-in camera
|
|
|
+ if image_data is None:
|
|
|
+ # First, check if there's an active stream with a buffered frame
|
|
|
+ # This avoids blocking when camera viewer is open
|
|
|
+ try:
|
|
|
+ from backend.app.api.routes.camera import get_buffered_frame
|
|
|
+
|
|
|
+ buffered = get_buffered_frame(printer_id)
|
|
|
+ if buffered:
|
|
|
+ image_data = buffered
|
|
|
+ camera_source = "built-in (buffered)"
|
|
|
+ logger.debug(f"Using buffered frame from active stream for printer {printer_id}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.debug(f"Could not get buffered frame: {e}")
|
|
|
+
|
|
|
+ # If no buffered frame, try to capture a new one
|
|
|
+ if image_data is None:
|
|
|
+ import tempfile
|
|
|
+
|
|
|
+ from backend.app.services.camera import capture_camera_frame
|
|
|
+
|
|
|
+ with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
|
|
|
+ tmp_path = Path(tmp.name)
|
|
|
+
|
|
|
+ try:
|
|
|
+ success = await capture_camera_frame(ip_address, access_code, model, tmp_path, timeout=10)
|
|
|
+ if success:
|
|
|
+ with open(tmp_path, "rb") as f:
|
|
|
+ image_data = f.read()
|
|
|
+ camera_source = "built-in"
|
|
|
+ logger.debug(f"Captured frame from built-in camera for printer {printer_id}")
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+ tmp_path.unlink()
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ return image_data, camera_source
|
|
|
+
|
|
|
+
|
|
|
+async def check_plate_empty(
|
|
|
+ printer_id: int,
|
|
|
+ ip_address: str,
|
|
|
+ access_code: str,
|
|
|
+ model: str,
|
|
|
+ plate_type: str | None = None,
|
|
|
+ include_debug_image: bool = False,
|
|
|
+ external_camera_url: str | None = None,
|
|
|
+ external_camera_type: str | None = None,
|
|
|
+ use_external: bool = False,
|
|
|
+ roi: tuple[float, float, float, float] | None = None,
|
|
|
+) -> PlateDetectionResult:
|
|
|
+ """Check if the build plate is empty for a printer.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ printer_id: Printer database ID
|
|
|
+ ip_address: Printer IP address
|
|
|
+ access_code: Printer access code
|
|
|
+ model: Printer model string
|
|
|
+ plate_type: Type of build plate for calibration lookup
|
|
|
+ include_debug_image: If True, include annotated image in result
|
|
|
+ external_camera_url: URL of external camera (if configured)
|
|
|
+ external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
|
|
|
+ use_external: If True, prefer external camera over built-in
|
|
|
+ roi: Region of interest as (x%, y%, w%, h%) - percentages of image size
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ PlateDetectionResult with analysis results
|
|
|
+ """
|
|
|
+ if not OPENCV_AVAILABLE:
|
|
|
+ return PlateDetectionResult(
|
|
|
+ is_empty=True,
|
|
|
+ confidence=0.0,
|
|
|
+ difference_percent=0.0,
|
|
|
+ message="OpenCV not available - plate detection disabled",
|
|
|
+ )
|
|
|
+
|
|
|
+ image_data, camera_source = await capture_camera_image(
|
|
|
+ printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
|
|
|
+ )
|
|
|
+
|
|
|
+ if image_data is None:
|
|
|
+ return PlateDetectionResult(
|
|
|
+ is_empty=True, # Default to empty on error
|
|
|
+ confidence=0.0,
|
|
|
+ difference_percent=0.0,
|
|
|
+ message="Failed to capture camera frame from any source",
|
|
|
+ )
|
|
|
+
|
|
|
+ # Analyze the captured frame
|
|
|
+ detector = PlateDetector(roi=roi)
|
|
|
+ result = detector.analyze_frame(image_data, printer_id, plate_type, include_debug_image)
|
|
|
+
|
|
|
+ # Add camera source to message
|
|
|
+ result.message = f"[{camera_source}] {result.message}"
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+async def calibrate_plate(
|
|
|
+ printer_id: int,
|
|
|
+ ip_address: str,
|
|
|
+ access_code: str,
|
|
|
+ model: str,
|
|
|
+ label: str | None = None,
|
|
|
+ external_camera_url: str | None = None,
|
|
|
+ external_camera_type: str | None = None,
|
|
|
+ use_external: bool = False,
|
|
|
+) -> tuple[bool, str, int]:
|
|
|
+ """Calibrate plate detection by capturing a reference image of the empty plate.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ printer_id: Printer database ID
|
|
|
+ ip_address: Printer IP address
|
|
|
+ access_code: Printer access code
|
|
|
+ model: Printer model string
|
|
|
+ label: Optional label for this reference (e.g., "High Temp Plate")
|
|
|
+ external_camera_url: URL of external camera (if configured)
|
|
|
+ external_camera_type: Type of external camera (mjpeg, rtsp, snapshot)
|
|
|
+ use_external: If True, prefer external camera over built-in
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple of (success, message, index)
|
|
|
+ """
|
|
|
+ if not OPENCV_AVAILABLE:
|
|
|
+ return False, "OpenCV not available - plate detection disabled", -1
|
|
|
+
|
|
|
+ image_data, camera_source = await capture_camera_image(
|
|
|
+ printer_id, ip_address, access_code, model, external_camera_url, external_camera_type, use_external
|
|
|
+ )
|
|
|
+
|
|
|
+ if image_data is None:
|
|
|
+ return False, "Failed to capture camera frame for calibration", -1
|
|
|
+
|
|
|
+ detector = PlateDetector()
|
|
|
+ success, message, index = detector.calibrate(image_data, printer_id, label)
|
|
|
+
|
|
|
+ if success:
|
|
|
+ message = f"[{camera_source}] {message}"
|
|
|
+
|
|
|
+ return success, message, index
|
|
|
+
|
|
|
+
|
|
|
+def get_calibration_status(printer_id: int, plate_type: str | None = None) -> dict:
|
|
|
+ """Get calibration status for a printer.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Dict with calibration info including reference count
|
|
|
+ """
|
|
|
+ if not OPENCV_AVAILABLE:
|
|
|
+ return {
|
|
|
+ "available": False,
|
|
|
+ "calibrated": False,
|
|
|
+ "reference_count": 0,
|
|
|
+ "max_references": 5,
|
|
|
+ "message": "OpenCV not available",
|
|
|
+ }
|
|
|
+
|
|
|
+ detector = PlateDetector()
|
|
|
+ calibrated = detector.has_calibration(printer_id)
|
|
|
+ ref_count = detector.get_calibration_count(printer_id)
|
|
|
+
|
|
|
+ if calibrated:
|
|
|
+ message = f"Calibrated with {ref_count}/{detector.MAX_REFERENCES} reference(s)"
|
|
|
+ else:
|
|
|
+ message = "Not calibrated - please calibrate with empty plate"
|
|
|
+
|
|
|
+ return {
|
|
|
+ "available": True,
|
|
|
+ "calibrated": calibrated,
|
|
|
+ "reference_count": ref_count,
|
|
|
+ "max_references": detector.MAX_REFERENCES,
|
|
|
+ "message": message,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def delete_calibration(printer_id: int, plate_type: str | None = None) -> bool:
|
|
|
+ """Delete calibration for a printer and plate type."""
|
|
|
+ if not OPENCV_AVAILABLE:
|
|
|
+ return False
|
|
|
+
|
|
|
+ detector = PlateDetector()
|
|
|
+ return detector.delete_calibration(printer_id, plate_type)
|
|
|
+
|
|
|
+
|
|
|
+def is_plate_detection_available() -> bool:
|
|
|
+ """Check if plate detection feature is available (OpenCV installed)."""
|
|
|
+ return OPENCV_AVAILABLE
|